Logging changes & import cleanups.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1d5e407f Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1d5e407f Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1d5e407f Branch: refs/heads/master Commit: 1d5e407fd4ab961886eb7cf4d32a1f3d194dbe1a Parents: 1980562 Author: Dave Johnson <[email protected]> Authored: Thu Nov 3 17:20:36 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Thu Nov 3 17:20:36 2016 -0400 ---------------------------------------------------------------------- .../actorsystem/ActorSystemManagerImpl.java | 9 ++++++++- .../persistence/qakka/core/impl/InMemoryQueue.java | 5 ++++- .../qakka/core/impl/QueueMessageManagerImpl.java | 9 ++++++--- .../qakka/distributed/actors/QueueSender.java | 4 ++++ .../impl/QueueMessageSerializationImpl.java | 8 ++++---- .../sharding/impl/ShardSerializationImpl.java | 4 ++++ .../persistence/queue/impl/QakkaQueueManager.java | 2 ++ .../usergrid/rest/system/QueueSystemResource.java | 14 +++++++++----- 8 files changed, 41 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d5e407f/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java index 687715a..96ebe69 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java @@ -313,7 +313,10 @@ public class ActorSystemManagerImpl implements ActorSystemManager { }} ); put( "cluster", new HashMap<String, Object>() {{ - put( "max-nr-of-instances-per-node", numInstancesPerNode); // this sets default if router does not set + + // this sets default if router does not set + put( "max-nr-of-instances-per-node", numInstancesPerNode); + put( "roles", Collections.singletonList("io") ); put( "seed-nodes", new ArrayList<String>() {{ for (String seed : seeds) { @@ -337,6 +340,8 @@ public class ActorSystemManagerImpl implements ActorSystemManager { routerProducer.addConfiguration( configMap ); } + logger.debug("Actor system configMap: " + configMap ); + config = ConfigFactory.parseMap( configMap ) .withFallback( ConfigFactory.load( "application.conf" ) ); @@ -409,6 +414,8 @@ public class ActorSystemManagerImpl implements ActorSystemManager { } else { + logger.info( "Creating clusterClient for region [{}]", region ); + Set<ActorPath> seedPaths = new HashSet<>(20); for ( String seed : getSeedsByRegion().get( region ) ) { seedPaths.add( ActorPaths.fromString( seed + "/system/receptionist") ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d5e407f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java index 6a26483..4315df0 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java @@ -26,7 +26,10 @@ import org.apache.usergrid.persistence.qakka.serialization.queuemessages.Databas import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; +import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d5e407f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java index 59a14bd..ac2857f 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java @@ -32,8 +32,6 @@ import org.apache.usergrid.persistence.qakka.serialization.queuemessages.Databas import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization; import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization; -import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; -import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator; import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization; import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization; import org.slf4j.Logger; @@ -42,7 +40,10 @@ import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; import java.net.URISyntaxException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; @Singleton @@ -92,6 +93,8 @@ public class QueueMessageManagerImpl implements QueueMessageManager { throw new NotFoundException( "Queue " + queueName + " not found" ); } + logger.trace("Sending message to queue {} regions {}", queueName, destinationRegions); + // TODO: implement delay and expiration // Preconditions.checkArgument(delayMs == null || delayMs > 0L, http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d5e407f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java index ccc39f5..461c28f 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java @@ -129,12 +129,16 @@ public class QueueSender extends UntypedActor { if (actorSystemManager.getCurrentRegion().equals( destRegion )) { + logger.trace("Sending queue {} message to local region {}", queueName, destRegion ); + // send to current region via local clientActor ActorRef clientActor = actorSystemManager.getClientActor(); fut = Patterns.ask( clientActor, request, t ); } else { + logger.trace("Sending queue {} message to remote region {}", queueName, destRegion ); + // send to remote region via cluster client for that region ActorRef clusterClient = actorSystemManager.getClusterClient( destRegion ); fut = Patterns.ask( http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d5e407f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java index b1b57ae..fb49825 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java @@ -347,15 +347,15 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization while (defaultShardIterator.hasNext()) { Shard shard = defaultShardIterator.next(); - deleteAllBatch.add( createDeleteAllMessagesStatement( shard ) ); + Statement deleteAll = createDeleteAllMessagesStatement( shard ); + deleteAllBatch.add( deleteAll ); - logger.trace("added queueName {} type {} shard {}", - queueName, shardType, shard.getShardId() ); + logger.trace("Deleting messages for queue {} shardType {} shard {} query {}", + queueName, shardType, shard.getShardId(), deleteAll.toString() ); } } cassandraClient.getQueueMessageSession().execute( deleteAllBatch ); - logger.trace("deleted messages in queue: " + queueName); // clear counters, we only want to this to happen after successful deletion for ( Shard.Type shardType : shardTypes ) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d5e407f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java index 501a6ca..4f59057 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java @@ -164,9 +164,13 @@ public class ShardSerializationImpl implements ShardSerialization { .where( QueryBuilder.eq(COLUMN_QUEUE_NAME, queueName) ) .and( QueryBuilder.eq(COLUMN_REGION, region) ); + logger.trace("Removing shards for queue {} region {} shardType {} query {}", + queueName, region, shardType, batch.toString()); + batch.add( delete ); } + cassandraClient.getQueueMessageSession().execute( batch ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d5e407f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java index b81e888..9d7a341 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java @@ -80,6 +80,8 @@ public class QakkaQueueManager implements LegacyQueueManager { @Override public <T extends Serializable> void sendMessage(T body) throws IOException { + logger.debug( "Sending message to queue {} region {}", this.scope.getRegionImplementation().name() ); + createQueueIfNecessary(); ByteArrayOutputStream bos = new ByteArrayOutputStream(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/1d5e407f/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java index a9d3e1b..16e77d0 100644 --- a/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java +++ b/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java @@ -153,23 +153,27 @@ public class QueueSystemResource extends AbstractContextResource { final List<String> listOfQueues = queueManager.getListOfQueues(); for ( String queueName : listOfQueues ) { + Map<String, Object> queueInfo = new HashMap<>(); try { - Map<String, Object> queueInfo = new HashMap<>(); queueInfo.put( "name", queueName ); + + queueInfo.put( "inmemory", inMemoryQueue.size( queueName ) ); + + UUID newest = inMemoryQueue.getNewest( queueName ); + queueInfo.put( "since", newest == null ? "null" : newest.timestamp() ); + queueInfo.put( "depth", queueMessageManager.getQueueDepth( queueName, DatabaseQueueMessage.Type.DEFAULT ) ); queueInfo.put( "inflight", queueMessageManager.getQueueDepth( queueName, DatabaseQueueMessage.Type.INFLIGHT ) ); - queueInfo.put( "inmemory", inMemoryQueue.size( queueName ) ); - UUID newest = inMemoryQueue.getNewest( queueName ); - queueInfo.put( "since", newest == null ? "null" : newest.timestamp() ); - queues.add( queueInfo ); } catch ( Exception e ) { logger.error("Error getting queue info for queue: " + queueName, e); } + + queues.add( queueInfo ); } info.put("queues", queues);
