Use Guice magic instead of trying to serialize UniqueValueTable as an Akka creation prop.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/e0d95bf0 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/e0d95bf0 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/e0d95bf0 Branch: refs/heads/release-2.1.1 Commit: e0d95bf0a0e9ec1cb622eb752ca895debfdbe465 Parents: b90985c Author: Dave Johnson <[email protected]> Authored: Thu Jun 16 11:11:02 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Thu Jun 16 11:11:02 2016 -0400 ---------------------------------------------------------------------- .../uniquevalues/ClusterSingletonRouter.java | 28 +++++++++-- .../uniquevalues/GuiceActorProducer.java | 46 ++++++++++++++++++ .../uniquevalues/UniqueValueActor.java | 25 ++++++++-- .../uniquevalues/UniqueValuesServiceImpl.java | 50 ++++++++++++++------ 4 files changed, 125 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/e0d95bf0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java index d9c1aa4..7cc24eb 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.usergrid.persistence.collection.uniquevalues; import akka.actor.ActorRef; @@ -5,6 +21,8 @@ import akka.actor.Props; import akka.actor.UntypedActor; import akka.routing.ConsistentHashingRouter; import akka.routing.FromConfig; +import com.google.inject.Inject; +import com.google.inject.Injector; /** @@ -14,10 +32,10 @@ public class ClusterSingletonRouter extends UntypedActor { private final ActorRef router; - - public ClusterSingletonRouter( UniqueValuesTable table ) { - router = getContext().actorOf( - FromConfig.getInstance().props(Props.create(UniqueValueActor.class, table )), "router"); + @Inject + public ClusterSingletonRouter( Injector injector ) { + router = getContext().actorOf( FromConfig.getInstance().props( + Props.create( GuiceActorProducer.class, injector, UniqueValueActor.class)), "router"); } @Override @@ -27,7 +45,7 @@ public class ClusterSingletonRouter extends UntypedActor { UniqueValueActor.Request request = (UniqueValueActor.Request)message; ConsistentHashingRouter.ConsistentHashableEnvelope envelope = - new ConsistentHashingRouter.ConsistentHashableEnvelope( message, request.getConsistentHashKey() ); + new ConsistentHashingRouter.ConsistentHashableEnvelope( message, request.getConsistentHashKey() ); router.tell( envelope, getSender()); } else { http://git-wip-us.apache.org/repos/asf/usergrid/blob/e0d95bf0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/GuiceActorProducer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/GuiceActorProducer.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/GuiceActorProducer.java new file mode 100644 index 0000000..0e99bca --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/GuiceActorProducer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.collection.uniquevalues; + +import akka.actor.Actor; +import akka.actor.IndirectActorProducer; +import com.google.inject.Injector; + + +public class GuiceActorProducer implements IndirectActorProducer { + + final Injector injector; + final Class<? extends Actor> actorClass; + + public GuiceActorProducer(Injector injector, Class<? extends Actor> actorClass) { + this.injector = injector; + this.actorClass = actorClass; + } + + @Override + public Class<? extends Actor> actorClass() { + return actorClass; + } + + @Override + public Actor produce() { + return injector.getInstance( actorClass ); + } +} + http://git-wip-us.apache.org/repos/asf/usergrid/blob/e0d95bf0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java index 7becd47..fe2e356 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java @@ -1,9 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.usergrid.persistence.collection.uniquevalues; import akka.actor.ActorRef; import akka.actor.UntypedActor; import akka.cluster.pubsub.DistributedPubSub; import akka.cluster.pubsub.DistributedPubSubMediator; +import com.google.inject.Inject; import org.apache.commons.lang3.RandomStringUtils; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Id; @@ -21,15 +38,15 @@ public class UniqueValueActor extends UntypedActor { //private MetricsService metricsService; - final private UniqueValuesTable table; - final private ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator(); private int count = 0; + @Inject + UniqueValuesTable table; + - public UniqueValueActor( UniqueValuesTable table ) { - this.table = table; + public UniqueValueActor( ) { } @Override http://git-wip-us.apache.org/repos/asf/usergrid/blob/e0d95bf0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java index b365b2b..5c12165 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java @@ -31,6 +31,7 @@ import akka.util.Timeout; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; import com.google.inject.Inject; +import com.google.inject.Injector; import com.google.inject.Singleton; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -52,6 +53,7 @@ import java.util.concurrent.TimeUnit; public class UniqueValuesServiceImpl implements UniqueValuesService { private static final Logger logger = LoggerFactory.getLogger( UniqueValuesServiceImpl.class ); + private final Injector injector; AkkaFig akkaFig; UniqueValuesTable table; private String hostname; @@ -78,7 +80,8 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { @Inject - public UniqueValuesServiceImpl( AkkaFig akkaFig, UniqueValuesTable table ) { + public UniqueValuesServiceImpl(Injector injector, AkkaFig akkaFig, UniqueValuesTable table ) { + this.injector = injector; this.akkaFig = akkaFig; this.table = table; @@ -155,16 +158,25 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { // Create one actor system with request actor for each region if ( StringUtils.isEmpty( hostname )) { - throw new RuntimeException( "No value specified for akka.hostname"); + throw new RuntimeException( "No value specified for " + AkkaFig.AKKA_HOSTNAME ); } if ( StringUtils.isEmpty( currentRegion )) { - throw new RuntimeException( "No value specified for akka.region"); + throw new RuntimeException( "No value specified for " + AkkaFig.AKKA_REGION ); + } + + if ( StringUtils.isEmpty( akkaFig.getRegionList() )) { + throw new RuntimeException( "No value specified for " + AkkaFig.AKKA_REGION_LIST ); + } + + if ( StringUtils.isEmpty( akkaFig.getRegionSeeds() )) { + throw new RuntimeException( "No value specified for " + AkkaFig.AKKA_REGION_SEEDS); } List regionList = Arrays.asList( akkaFig.getRegionList().toLowerCase().split(",") ); - logger.info("Initializing Akka for hostname {} region {} regionList {}", hostname, currentRegion, regionList); + logger.info("Initializing Akka for hostname {} region {} regionList {} seeds {}", + hostname, currentRegion, regionList, akkaFig.getRegionSeeds() ); String typesValue = akkaFig.getRegionTypes(); String[] regionTypes = StringUtils.isEmpty( typesValue ) ? new String[0] : typesValue.split(","); @@ -230,9 +242,13 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { // create cluster singleton supervisor for actor system ClusterSingletonManagerSettings settings = ClusterSingletonManagerSettings.create( system ).withRole("io"); + + // Akka.system().actorOf(Props.create(GuiceInjectedActor.class, INJECTOR,Retreiver.class)) + system.actorOf( ClusterSingletonManager.props( - Props.create( ClusterSingletonRouter.class, table ), - PoisonPill.getInstance(), settings ), "uvRouter"); + //Props.create( ClusterSingletonRouter.class, table ), + Props.create( GuiceActorProducer.class, injector, ClusterSingletonRouter.class), + PoisonPill.getInstance(), settings ), "uvRouter"); } // create proxy for sending messages to singleton @@ -450,10 +466,12 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { } catch ( UniqueValueException e ) { for (Field field : entity.getFields()) { - try { - cancelUniqueField( scope, entity, version, field, region ); - } catch (Throwable ignored) { - logger.debug( "Error canceling unique field", ignored ); + if (field.isUnique()) { + try { + cancelUniqueField( scope, entity, version, field, region ); + } catch (Throwable ignored) { + logger.debug( "Error canceling unique field", ignored ); + } } } throw e; @@ -480,10 +498,12 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { } catch ( UniqueValueException e ) { for (Field field : entity.getFields()) { - try { - cancelUniqueField( scope, entity, version, field, region) ; - } catch (Throwable ignored) { - logger.debug( "Error canceling unique field", ignored ); + if (field.isUnique()) { + try { + cancelUniqueField( scope, entity, version, field, region ); + } catch (Throwable ex ) { + logger.error( "Error canceling unique field", ex ); + } } } throw e; @@ -546,7 +566,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService { throw new RuntimeException( "No request actor for type, cannot verify unique fields!" ); } - UniqueValueActor.Confirmation request = new UniqueValueActor.Confirmation( + UniqueValueActor.Cancellation request = new UniqueValueActor.Cancellation( scope, entity.getId(), version, field ); requestActor.tell( request, null );
