Logging changes & import cleanups.

Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1d5e407f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1d5e407f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1d5e407f

Branch: refs/heads/master
Commit: 1d5e407fd4ab961886eb7cf4d32a1f3d194dbe1a
Parents: 1980562
Author: Dave Johnson <[email protected]>
Authored: Thu Nov 3 17:20:36 2016 -0400
Committer: Dave Johnson <[email protected]>
Committed: Thu Nov 3 17:20:36 2016 -0400

----------------------------------------------------------------------
 .../actorsystem/ActorSystemManagerImpl.java           |  9 ++++++++-
 .../persistence/qakka/core/impl/InMemoryQueue.java    |  5 ++++-
 .../qakka/core/impl/QueueMessageManagerImpl.java      |  9 ++++++---
 .../qakka/distributed/actors/QueueSender.java         |  4 ++++
 .../impl/QueueMessageSerializationImpl.java           |  8 ++++----
 .../sharding/impl/ShardSerializationImpl.java         |  4 ++++
 .../persistence/queue/impl/QakkaQueueManager.java     |  2 ++
 .../usergrid/rest/system/QueueSystemResource.java     | 14 +++++++++-----
 8 files changed, 41 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d5e407f/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
 
b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index 687715a..96ebe69 100644
--- 
a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ 
b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -313,7 +313,10 @@ public class ActorSystemManagerImpl implements 
ActorSystemManager {
                     }} );
 
                     put( "cluster", new HashMap<String, Object>() {{
-                        put( "max-nr-of-instances-per-node", 
numInstancesPerNode); // this sets default if router does not set
+
+                        // this sets default if router does not set
+                        put( "max-nr-of-instances-per-node", 
numInstancesPerNode);
+
                         put( "roles", Collections.singletonList("io") );
                         put( "seed-nodes", new ArrayList<String>() {{
                             for (String seed : seeds) {
@@ -337,6 +340,8 @@ public class ActorSystemManagerImpl implements 
ActorSystemManager {
                 routerProducer.addConfiguration( configMap );
             }
 
+            logger.debug("Actor system configMap: " + configMap );
+
             config = ConfigFactory.parseMap( configMap )
                 .withFallback( ConfigFactory.load( "application.conf" ) );
 
@@ -409,6 +414,8 @@ public class ActorSystemManagerImpl implements 
ActorSystemManager {
 
             } else {
 
+                logger.info( "Creating clusterClient for region [{}]", region 
);
+
                 Set<ActorPath> seedPaths = new HashSet<>(20);
                 for ( String seed : getSeedsByRegion().get( region ) ) {
                     seedPaths.add( ActorPaths.fromString( seed + 
"/system/receptionist") );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d5e407f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
index 6a26483..4315df0 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
@@ -26,7 +26,10 @@ import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.Databas
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d5e407f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
index 59a14bd..ac2857f 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
@@ -32,8 +32,6 @@ import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.Databas
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody;
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import 
org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
-import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
 import 
org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization;
 import 
org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
 import org.slf4j.Logger;
@@ -42,7 +40,10 @@ import org.slf4j.LoggerFactory;
 import java.io.UnsupportedEncodingException;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
 
 
 @Singleton
@@ -92,6 +93,8 @@ public class QueueMessageManagerImpl implements 
QueueMessageManager {
             throw new NotFoundException( "Queue " + queueName + " not found" );
         }
 
+        logger.trace("Sending message to queue {} regions {}", queueName, 
destinationRegions);
+
         // TODO: implement delay and expiration
 
 //        Preconditions.checkArgument(delayMs == null || delayMs > 0L,

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d5e407f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
index ccc39f5..461c28f 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
@@ -129,12 +129,16 @@ public class QueueSender extends UntypedActor {
 
                     if (actorSystemManager.getCurrentRegion().equals( 
destRegion )) {
 
+                        logger.trace("Sending queue {} message to local region 
{}", queueName, destRegion );
+
                         // send to current region via local clientActor
                         ActorRef clientActor = 
actorSystemManager.getClientActor();
                         fut = Patterns.ask( clientActor, request, t );
 
                     } else {
 
+                        logger.trace("Sending queue {} message to remote 
region {}", queueName, destRegion );
+
                         // send to remote region via cluster client for that 
region
                         ActorRef clusterClient = 
actorSystemManager.getClusterClient( destRegion );
                         fut = Patterns.ask(

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d5e407f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
index b1b57ae..fb49825 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -347,15 +347,15 @@ public class QueueMessageSerializationImpl implements 
QueueMessageSerialization
 
             while (defaultShardIterator.hasNext()) {
                 Shard shard = defaultShardIterator.next();
-                deleteAllBatch.add( createDeleteAllMessagesStatement( shard ) 
);
+                Statement deleteAll = createDeleteAllMessagesStatement( shard 
);
+                deleteAllBatch.add( deleteAll );
 
-                logger.trace("added queueName {} type {} shard {}",
-                    queueName, shardType, shard.getShardId() );
+                logger.trace("Deleting messages for queue {} shardType {} 
shard {} query {}",
+                    queueName, shardType, shard.getShardId(), 
deleteAll.toString() );
             }
         }
 
         cassandraClient.getQueueMessageSession().execute( deleteAllBatch );
-        logger.trace("deleted messages in queue: " + queueName);
 
         // clear counters, we only want to this to happen after successful 
deletion
         for ( Shard.Type shardType : shardTypes ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d5e407f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
index 501a6ca..4f59057 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
@@ -164,9 +164,13 @@ public class ShardSerializationImpl implements 
ShardSerialization {
                 .where( QueryBuilder.eq(COLUMN_QUEUE_NAME, queueName) )
                 .and( QueryBuilder.eq(COLUMN_REGION, region) );
 
+            logger.trace("Removing shards for queue {} region {} shardType {} 
query {}",
+                queueName, region, shardType, batch.toString());
+
             batch.add( delete );
         }
 
+
         cassandraClient.getQueueMessageSession().execute( batch );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d5e407f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
index b81e888..9d7a341 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
@@ -80,6 +80,8 @@ public class QakkaQueueManager implements LegacyQueueManager {
     @Override
     public <T extends Serializable> void sendMessage(T body) throws 
IOException {
 
+        logger.debug( "Sending message to queue {} region {}", 
this.scope.getRegionImplementation().name() );
+
         createQueueIfNecessary();
 
         ByteArrayOutputStream bos = new ByteArrayOutputStream();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d5e407f/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
----------------------------------------------------------------------
diff --git 
a/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
 
b/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
index a9d3e1b..16e77d0 100644
--- 
a/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
+++ 
b/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
@@ -153,23 +153,27 @@ public class QueueSystemResource extends 
AbstractContextResource {
         final List<String> listOfQueues = queueManager.getListOfQueues();
         for ( String queueName : listOfQueues ) {
 
+            Map<String, Object> queueInfo = new HashMap<>();
             try {
-                Map<String, Object> queueInfo = new HashMap<>();
 
                 queueInfo.put( "name", queueName );
+
+                queueInfo.put( "inmemory", inMemoryQueue.size( queueName ) );
+
+                UUID newest = inMemoryQueue.getNewest( queueName );
+                queueInfo.put( "since", newest == null ? "null" : 
newest.timestamp() );
+
                 queueInfo.put( "depth",
                     queueMessageManager.getQueueDepth( queueName, 
DatabaseQueueMessage.Type.DEFAULT ) );
                 queueInfo.put( "inflight",
                     queueMessageManager.getQueueDepth( queueName, 
DatabaseQueueMessage.Type.INFLIGHT ) );
-                queueInfo.put( "inmemory", inMemoryQueue.size( queueName ) );
 
-                UUID newest = inMemoryQueue.getNewest( queueName );
-                queueInfo.put( "since", newest == null ? "null" : 
newest.timestamp() );
 
-                queues.add( queueInfo );
             } catch ( Exception e ) {
                 logger.error("Error getting queue info for queue: " + 
queueName, e);
             }
+
+            queues.add( queueInfo );
         }
 
         info.put("queues", queues);

Reply via email to