Repository: usergrid Updated Branches: refs/heads/master 2108f6dc1 -> c62a34835
Test fixes in queue module only. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c62a3483 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c62a3483 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c62a3483 Branch: refs/heads/master Commit: c62a348351fefd0689f0e4e7708b37a4bc0b2ac8 Parents: 2108f6d Author: Dave Johnson <[email protected]> Authored: Fri Nov 11 15:32:42 2016 -0500 Committer: Dave Johnson <[email protected]> Committed: Fri Nov 11 15:32:42 2016 -0500 ---------------------------------------------------------------------- .../apache/usergrid/persistence/qakka/QakkaFig.java | 2 +- .../impl/MessageCounterSerializationImpl.java | 4 +++- .../sharding/impl/ShardCounterSerializationImpl.java | 9 ++------- .../qakka/distributed/actors/ShardAllocatorTest.java | 14 +++++++++----- .../queue/src/test/resources/log4j.properties | 9 ++++++++- .../queue/src/test/resources/qakka.properties | 6 +++--- 6 files changed, 26 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/c62a3483/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java index c034b92..061807b 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java @@ -174,7 +174,7 @@ public interface QakkaFig extends GuicyFig, Serializable { int getMaxTtlSeconds(); @Key(QUEUE_IN_MEMORY) - @Default("false") + @Default("true") boolean getInMemoryCache(); @Key(QUEUE_IN_MEMORY_REFRESH_ASYNC) http://git-wip-us.apache.org/repos/asf/usergrid/blob/c62a3483/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java index c17c34f..b43aa76 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java @@ -231,7 +231,9 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat } } - saveIfNeeded( queueName, type ); + synchronized ( inMemoryCounters ) { + saveIfNeeded( queueName, type ); + } return inMemoryCounters.get( key ).value(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/c62a3483/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java index 16e31a1..cc94d85 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java @@ -132,11 +132,9 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization inMemoryCounters.get( key ).getIncrement().addAndGet( increment ); return; } - } - InMemoryCount inMemoryCount = inMemoryCounters.get( key ); + InMemoryCount inMemoryCount = inMemoryCounters.get( key ); - synchronized ( inMemoryCount ) { long totalIncrement = inMemoryCount.getIncrement().addAndGet( increment ); if (totalIncrement > maxInMemoryIncrement) { @@ -168,11 +166,8 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization inMemoryCounters.put( key, new InMemoryCount( value )); } } - } - - InMemoryCount inMemoryCount = inMemoryCounters.get( key ); - synchronized ( inMemoryCount ) { + InMemoryCount inMemoryCount = inMemoryCounters.get( key ); if ( inMemoryCount.needsUpdate() ) { long totalIncrement = inMemoryCount.getIncrement().get(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/c62a3483/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java index 0ac1537..f090bd5 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java @@ -53,7 +53,7 @@ import java.util.Optional; public class ShardAllocatorTest extends AbstractTest { - private static final Logger logger = LoggerFactory.getLogger( QueueReaderTest.class ); + private static final Logger logger = LoggerFactory.getLogger( ShardAllocatorTest.class ); @Override @@ -127,7 +127,7 @@ public class ShardAllocatorTest extends AbstractTest { // Increment last shard by 20% of max shardCounterSer.incrementCounter( - queueName, Shard.Type.DEFAULT, lastShard.getShardId(), (long)(0.2 * maxPerShard) ); + queueName, Shard.Type.DEFAULT, lastShard.getShardId(), (long)(0.3 * maxPerShard) ); // Run shard allocator again @@ -191,6 +191,8 @@ public class ShardAllocatorTest extends AbstractTest { queueManager.createQueue( new Queue( queueName )); + distributedQueueService.refresh(); + try { // Create number of messages @@ -198,6 +200,7 @@ public class ShardAllocatorTest extends AbstractTest { int numMessages = 400; for (int i = 0; i < numMessages; i++) { + queueMessageManager.sendMessages( queueName, Collections.singletonList( region ), @@ -205,15 +208,16 @@ public class ShardAllocatorTest extends AbstractTest { null, // expiration "application/json", DataType.serializeValue( "{}", ProtocolVersion.NEWEST_SUPPORTED ) ); - Thread.sleep( 10 ); + + Thread.sleep( 50 ); } distributedQueueService.refresh(); // Test that approximately right number of shards created int shardCount = countShards( cassandraClient, shardCounterSer, queueName, region, Shard.Type.DEFAULT ); - Assert.assertTrue( shardCount + " is too few shards", shardCount > 7 ); - Assert.assertTrue( shardCount + " is too many shards", shardCount < 17 ); + Assert.assertTrue( shardCount + " is too few shards", shardCount > 15 ); + Assert.assertTrue( shardCount + " is too many shards", shardCount < 40 ); } finally { queueManager.deleteQueue( queueName ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/c62a3483/stack/corepersistence/queue/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/resources/log4j.properties b/stack/corepersistence/queue/src/test/resources/log4j.properties index d542096..1b14d87 100644 --- a/stack/corepersistence/queue/src/test/resources/log4j.properties +++ b/stack/corepersistence/queue/src/test/resources/log4j.properties @@ -24,6 +24,13 @@ log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) %c{1} - %m%n log4j.logger.org.apache.cassandra=WARN log4j.logger.org.glassfish=WARN -log4j.logger.org.apache.usergrid.persistence.qakka=INFO +log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG log4j.logger.org.apache.usergrid.persistence.queue=INFO log4j.logger.org.apache.usergrid.corepersistence.asyncevents=INFO + +#log4j.logger.org.apache.usergrid.persistence.qakka.serialization.queuemessages=TRACE +log4j.logger.org.apache.usergrid.persistence.qakka.serialization.sharding=TRACE +#log4j.logger.org.apache.usergrid.persistence.qakka.distributed.actors=TRACE +log4j.logger.org.apache.usergrid.persistence.qakka.distributed.actors.ShardAllocator=TRACE + + http://git-wip-us.apache.org/repos/asf/usergrid/blob/c62a3483/stack/corepersistence/queue/src/test/resources/qakka.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties index 0c8e686..3d1888e 100644 --- a/stack/corepersistence/queue/src/test/resources/qakka.properties +++ b/stack/corepersistence/queue/src/test/resources/qakka.properties @@ -36,7 +36,7 @@ usergrid.cluster.seeds=us-east:localhost # Port used for cluster communications. usergrid.cluster.port=3545 -queue.inmemory.cache=false +queue.inmemory.cache=true queue.num.actors=50 queue.sender.num.actors=100 @@ -51,8 +51,8 @@ queue.shard.allocation.check.frequency.millis=100 queue.shard.allocation.advance.time.millis=200 # set low for testing purposes -queue.shard.counter.max-in-memory=10 -queue.message.counter.max-in-memory=10 +queue.shard.counter.max-in-memory=50 +queue.message.counter.max-in-memory=50 queue.long.polling.time.millis=2000
