Changes to get Qakka using same injector as rest of Usergrid

Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/18e4305b
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/18e4305b
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/18e4305b

Branch: refs/heads/usergrid-1318-queue
Commit: 18e4305b995be88947b72172dd22056702659a8e
Parents: 832b505
Author: Dave Johnson <snoopd...@apache.org>
Authored: Fri Sep 16 18:33:10 2016 -0400
Committer: Dave Johnson <snoopd...@apache.org>
Committed: Fri Sep 16 18:33:10 2016 -0400

----------------------------------------------------------------------
 .../corepersistence/CpEntityManagerFactory.java |  3 ++
 .../usergrid/persistence/core/CassandraFig.java |  2 +-
 .../index/impl/EsIndexProducerImpl.java         |  2 --
 .../apache/usergrid/persistence/qakka/App.java  | 12 +++----
 .../usergrid/persistence/qakka/QakkaModule.java |  6 ++--
 .../impl/DistributedQueueServiceImpl.java       | 35 ++++++++++++++------
 6 files changed, 38 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/18e4305b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 5d8c417..4bec92d 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -60,6 +60,7 @@ import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.apache.usergrid.persistence.qakka.App;
 import 
org.apache.usergrid.persistence.qakka.distributed.impl.QueueActorRouterProducer;
 import 
org.apache.usergrid.persistence.qakka.distributed.impl.QueueSenderRouterProducer;
 import 
org.apache.usergrid.persistence.qakka.distributed.impl.QueueWriterRouterProducer;
@@ -151,6 +152,8 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
             try {
                 logger.info("Akka cluster starting...");
 
+                // TODO: fix this kludge
+                injector.getInstance( App.class );
                 this.actorSystemManager = injector.getInstance( 
ActorSystemManager.class );
 
                 actorSystemManager.registerRouterProducer( 
injector.getInstance( UniqueValuesService.class ) );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/18e4305b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
index b599a20..bc8d087 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
@@ -91,7 +91,7 @@ public interface CassandraFig extends GuicyFig {
     @Default( "Usergrid_Applications" )
     String getApplicationKeyspace();
 
-    @Key( "cassandra.keyspace.application_local" )
+    @Key( "cassandra.keyspace.application.local" )
     @Default( "Usergrid_Applications_Local" )
     String getApplicationLocalKeyspace();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/18e4305b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
index 10d5e4a..8f58ef7 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
@@ -57,7 +57,6 @@ public class EsIndexProducerImpl implements IndexProducer {
     private final IndexFig config;
     private final FailureMonitorImpl failureMonitor;
     private final Client client;
-    private final Timer flushTimer;
     private final IndexFig indexFig;
     private final Counter indexSizeCounter;
     private final Histogram roundtripTimer;
@@ -70,7 +69,6 @@ public class EsIndexProducerImpl implements IndexProducer {
     @Inject
     public EsIndexProducerImpl(final IndexFig config, final EsProvider 
provider,
                                final MetricsFactory metricsFactory, final 
IndexFig indexFig) {
-        this.flushTimer = metricsFactory.getTimer(EsIndexProducerImpl.class, 
"index_buffer.flush");
         this.indexSizeCounter = 
metricsFactory.getCounter(EsIndexProducerImpl.class, "index_buffer.size");
         this.roundtripTimer = 
metricsFactory.getHistogram(EsIndexProducerImpl.class, 
"index_buffer.message_cycle");
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/18e4305b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
index 9d9c972..41bc6fa 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
@@ -59,12 +59,12 @@ public class App implements MetricsService {
         this.actorSystemFig = actorSystemFig;
         this.actorSystemManager = actorSystemManager;
         this.distributedQueueService = distributedQueueService;
-
-        try {
-            migrationManager.migrate();
-        } catch (MigrationException e) {
-            throw new QakkaRuntimeException( "Error running migration", e );
-        }
+//
+//        try {
+//            migrationManager.migrate();
+//        } catch (MigrationException e) {
+//            throw new QakkaRuntimeException( "Error running migration", e );
+//        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/18e4305b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
index 0c37e82..d1d8d7e 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
@@ -64,8 +64,8 @@ public class QakkaModule extends AbstractModule {
             // TODO: reconcile with usergrid props
             // load properties from one properties file using Netflix Archaius 
so that GuicyFig will see them
             ConfigurationManager.loadCascadedPropertiesFromResources( "qakka" 
);
-        } catch (IOException e) {
-            logger.warn("Unable to load qakka.properties");
+        } catch (Throwable t) {
+            logger.warn("Unable to load qakka.properties (can be ignored in 
Usergrid)");
         }
     }
 
@@ -105,11 +105,11 @@ public class QakkaModule extends AbstractModule {
         Multibinder<Migration> migrationBinder = Multibinder.newSetBinder( 
binder(), Migration.class );
 
         migrationBinder.addBinding().to( Key.get( AuditLogSerialization.class 
) );
-        //migrationBinder.addBinding().to( Key.get( 
MessageCounterSerialization.class ) );
         migrationBinder.addBinding().to( Key.get( 
QueueMessageSerialization.class ) );
         migrationBinder.addBinding().to( Key.get( QueueSerialization.class ) );
         migrationBinder.addBinding().to( Key.get( 
ShardCounterSerialization.class ) );
         migrationBinder.addBinding().to( Key.get( ShardSerialization.class ) );
         migrationBinder.addBinding().to( Key.get( 
TransferLogSerialization.class ) );
+        //migrationBinder.addBinding().to( Key.get( 
MessageCounterSerialization.class ) );
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/18e4305b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index 1243c23..ec667e6 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -25,6 +25,7 @@ import akka.util.Timeout;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
+import org.apache.usergrid.persistence.actorsystem.ClientActor;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.QueueManager;
 import 
org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
@@ -185,23 +186,37 @@ public class DistributedQueueServiceImpl implements 
DistributedQueueService {
                 // ask ClientActor and wait (up to timeout) for response
 
                 Future<Object> fut = Patterns.ask( 
actorSystemManager.getClientActor(), request, t );
-                final QakkaMessage response = (QakkaMessage)Await.result( fut, 
t.duration() );
+                Object responseObject = Await.result( fut, t.duration() );
+
+                if ( responseObject instanceof QakkaMessage ) {
 
-                if ( response != null && response instanceof QueueGetResponse) 
{
-                    QueueGetResponse qprm = (QueueGetResponse)response;
-                    if ( qprm.isSuccess() ) {
-                        if (retries > 1) {
-                            logger.debug( "getNextMessage SUCCESS after {} 
retries", retries );
+                    final QakkaMessage response = (QakkaMessage)Await.result( 
fut, t.duration() );
+
+                    if ( response != null && response instanceof 
QueueGetResponse) {
+                        QueueGetResponse qprm = (QueueGetResponse)response;
+                        if ( qprm.isSuccess() ) {
+                            if (retries > 1) {
+                                logger.debug( "getNextMessage SUCCESS after {} 
retries", retries );
+                            }
                         }
+                        return qprm.getQueueMessages();
+
+
+                    } else if ( response != null  ) {
+                        logger.debug("ERROR RESPONSE (1) popping queue, 
retrying {}", retries );
+
+                    } else {
+                        logger.debug("TIMEOUT popping to queue, retrying {}", 
retries );
                     }
-                    return qprm.getQueueMessages();
 
+                } else if ( responseObject instanceof 
ClientActor.ErrorResponse ) {
 
-                } else if ( response != null  ) {
-                    logger.debug("ERROR RESPONSE popping queue, retrying {}", 
retries );
+                    final ClientActor.ErrorResponse errorResponse = 
(ClientActor.ErrorResponse)responseObject;
+                    logger.debug("ACTORSYSTEM ERROR popping queue: {}, 
retrying {}",
+                        errorResponse.getMessage(), retries );
 
                 } else {
-                    logger.debug("TIMEOUT popping to queue, retrying {}", 
retries );
+                    logger.debug("UNKNOWN RESPONSE popping queue, retrying 
{}", retries );
                 }
 
             } catch ( Exception e ) {

Reply via email to