Repository: usergrid Updated Branches: refs/heads/usergrid-1318-queue 2a8679512 -> ab26fa526
Message counter fixes Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fc68e427 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fc68e427 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fc68e427 Branch: refs/heads/usergrid-1318-queue Commit: fc68e4270de147d0c37cd1fc31c0407a25c6a711 Parents: 2a86795 Author: Dave Johnson <[email protected]> Authored: Thu Nov 10 10:43:32 2016 -0500 Committer: Dave Johnson <[email protected]> Committed: Thu Nov 10 10:43:32 2016 -0500 ---------------------------------------------------------------------- stack/corepersistence/common/pom.xml | 4 ++ .../impl/MessageCounterSerializationImpl.java | 65 +++++++++---------- .../distributed/QueueActorServiceTest.java | 67 +++++++++++++++++++- .../distributed/actors/ShardAllocatorTest.java | 4 +- .../DatabaseQueueMessageSerializationTest.java | 5 +- .../queue/src/test/resources/qakka.properties | 2 + .../rest/system/QueueSystemResource.java | 2 +- 7 files changed, 105 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/fc68e427/stack/corepersistence/common/pom.xml ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/pom.xml b/stack/corepersistence/common/pom.xml index 63d339b..aacec73 100644 --- a/stack/corepersistence/common/pom.xml +++ b/stack/corepersistence/common/pom.xml @@ -80,6 +80,10 @@ <artifactId>netty</artifactId> <groupId>io.netty</groupId> </exclusion> + <exclusion> + <artifactId>logback-classic</artifactId> + <groupId>ch.qos.logback</groupId> + </exclusion> </exclusions> </dependency> http://git-wip-us.apache.org/repos/asf/usergrid/blob/fc68e427/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java index 2e7722d..c17c34f 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java @@ -86,6 +86,7 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat final AtomicLong decrement = new AtomicLong( 0L ); long lastWritten = 0L; + InMemoryCount( long baseCount ) { this.baseCount = baseCount; } @@ -95,7 +96,7 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat } public void decrement( long dec ) { this.decrement.addAndGet( dec ); - this.totalInMemoryCount.addAndGet( -dec ); + this.totalInMemoryCount.addAndGet( dec ); } public long getIncrement() { return increment.get(); @@ -114,6 +115,7 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat this.baseCount = 0; this.increment.set( 0L ); this.decrement.set( 0L ); + this.totalInMemoryCount.set( 0L ); } public long value() { // return totalInMemoryCount.get(); // for testing using just in-memory counter: @@ -135,7 +137,7 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat this.cassandraConfig = cassandraConfig; this.maxChangesBeforeSave = qakkaFig.getMessageCounterMaxInMemory(); this.cassandraClient = cassandraClient; - this.writeTimeout = qakkaFig.getShardCounterWriteTimeoutMillis(); + this.writeTimeout = qakkaFig.getMessageCounterWriteTimeoutMillis(); } @@ -149,32 +151,30 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat String key = buildKey( queueName, type ); - synchronized ( inMemoryCounters ) { + synchronized (inMemoryCounters) { if (inMemoryCounters.get( key ) == null) { - Long value = retrieveCounterFromStorage( queueName, type ); + Long value = retrieveCounterFromStorage( queueName, type ); - if (value == null) { - incrementCounterInStorage( queueName, type, 0L ); - inMemoryCounters.put( key, new InMemoryCount( 0L ) ); - } else { - inMemoryCounters.put( key, new InMemoryCount( value ) ); + if (value == null) { + incrementCounterInStorage( queueName, type, 0L ); + inMemoryCounters.put( key, new InMemoryCount( 0L ) ); + } else { + inMemoryCounters.put( key, new InMemoryCount( value ) ); + } } - } - } - InMemoryCount inMemoryCount = inMemoryCounters.get( key ); + InMemoryCount inMemoryCount = inMemoryCounters.get( key ); - synchronized ( inMemoryCount ) { inMemoryCount.increment( increment ); saveIfNeeded( queueName, type ); } if ( logger.isDebugEnabled() ) { - long value = inMemoryCounters.get( key ).value(); + long value = getCounterValue( queueName, type ); if (value <= 0) { - logger.debug( "Queue {} type {} incremented {} count = {}", queueName, type, increment, value ); + logger.debug( "Queue {} type {} incremented {} count {}", queueName, type, increment, value ); } } } @@ -198,19 +198,17 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat inMemoryCounters.put( key, new InMemoryCount( value ) ); } } - } - final InMemoryCount inMemoryCount = inMemoryCounters.get( key ); + final InMemoryCount inMemoryCount = inMemoryCounters.get( key ); - synchronized ( inMemoryCount ) { inMemoryCount.decrement( decrement ); saveIfNeeded( queueName, type ); } if ( logger.isDebugEnabled() ) { - long value = inMemoryCounters.get( key ).value(); + long value = getCounterValue( queueName, type ); if (value <= 0) { - logger.debug( "Queue {} type {} incremented count = {}", queueName, type, value ); + logger.debug( "Queue {} type {} decremented {} count {}", queueName, type, decrement, value ); } } } @@ -233,17 +231,7 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat } } - InMemoryCount inMemoryCount = inMemoryCounters.get( key ); - - synchronized ( inMemoryCount ) { - - if ( inMemoryCount.needsUpdate() ) { - long totalIncrement = inMemoryCount.getIncrement(); - incrementCounterInStorage( queueName, type, totalIncrement ); - inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type ) ); - inMemoryCount.clearDeltas(); - } - } + saveIfNeeded( queueName, type ); return inMemoryCounters.get( key ).value(); } @@ -331,22 +319,27 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat InMemoryCount inMemoryCount = inMemoryCounters.get( key ); - if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) { + if ( inMemoryCount.needsUpdate() || numChanges.incrementAndGet() > maxChangesBeforeSave ) { long totalIncrement = inMemoryCount.getIncrement(); - incrementCounterInStorage( queueName, type, totalIncrement ); - long totalDecrement = inMemoryCount.getDecrement(); + long baseCount = retrieveCounterFromStorage( queueName, type ); + + logger.debug("Saved counter {} type {} to storage baseCount {} inc {} dec {}", + queueName, type, baseCount, totalIncrement, totalDecrement ); + + incrementCounterInStorage( queueName, type, totalIncrement ); decrementCounterInStorage( queueName, type, totalDecrement ); - long baseCount = retrieveCounterFromStorage( queueName, type ); + baseCount = retrieveCounterFromStorage( queueName, type ); - logger.debug("Writing queue counter {} type {} to storage count = {}", queueName, type, baseCount ); + logger.debug("Saved counter {} type {} to storage baseCount {}", queueName, type, baseCount ); inMemoryCount.setBaseCount( baseCount ); inMemoryCount.clearDeltas(); numChanges.set( 0 ); + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/fc68e427/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java index 053c093..da5d166 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java @@ -125,7 +125,6 @@ public class QueueActorServiceTest extends AbstractTest { DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class ); QueueMessageSerialization serialization = injector.getInstance( QueueMessageSerialization.class ); TransferLogSerialization xferLogSerialization = injector.getInstance( TransferLogSerialization.class ); - InMemoryQueue inMemoryQueue = injector.getInstance( InMemoryQueue.class ); QueueMessageManager queueMessageManager = injector.getInstance( QueueMessageManager.class ); String queueName = "queue_testGetMultipleQueueMessages_" + UUID.randomUUID(); @@ -186,4 +185,70 @@ public class QueueActorServiceTest extends AbstractTest { queueManager.deleteQueue( queueName ); } } + + + @Test + public void testQueueMessageCounter() throws InterruptedException { + + Injector injector = getInjector(); + + ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class ); + String region = actorSystemFig.getRegionLocal(); + + App app = injector.getInstance( App.class ); + app.start("localhost", getNextAkkaPort(), region); + + DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class ); + QueueMessageSerialization serialization = injector.getInstance( QueueMessageSerialization.class ); + TransferLogSerialization xferLogSerialization = injector.getInstance( TransferLogSerialization.class ); + QueueMessageManager queueMessageManager = injector.getInstance( QueueMessageManager.class ); + + String queueName = "queue_testGetMultipleQueueMessages_" + UUID.randomUUID(); + QueueManager queueManager = injector.getInstance( QueueManager.class ); + + try { + + queueManager.createQueue( + new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) ); + + UUID messageId = UUIDGen.getTimeUUID(); + + final String data = "my test data"; + final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody( + DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" ); + serialization.writeMessageData( messageId, messageBody ); + + xferLogSerialization.recordTransferLog( + queueName, actorSystemFig.getRegionLocal(), region, messageId ); + + distributedQueueService.sendMessageToRegion( + queueName, region, region, messageId, null, null ); + + DatabaseQueueMessage.Type type = DatabaseQueueMessage.Type.DEFAULT; + + Thread.sleep(5000); + + int maxRetries = 10; + int retries = 0; + long count = 0; + while (retries++ < maxRetries) { + distributedQueueService.refresh(); + count = queueMessageManager.getQueueDepth( queueName, type ); + if ( count > 0 ) { + break; + } + Thread.sleep( 1000 ); + } + + Thread.sleep( 1000 ); + + Assert.assertEquals( 1, queueMessageManager.getQueueDepth( queueName, type ) ); + + distributedQueueService.shutdown(); + + } finally { + queueManager.deleteQueue( queueName ); + } + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/fc68e427/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java index ecacccc..0ac1537 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java @@ -212,8 +212,8 @@ public class ShardAllocatorTest extends AbstractTest { // Test that approximately right number of shards created int shardCount = countShards( cassandraClient, shardCounterSer, queueName, region, Shard.Type.DEFAULT ); - Assert.assertTrue( "shards > 7", shardCount > 7 ); - Assert.assertTrue( "shards < 17", shardCount < 17 ); + Assert.assertTrue( shardCount + " is too few shards", shardCount > 7 ); + Assert.assertTrue( shardCount + " is too many shards", shardCount < 17 ); } finally { queueManager.deleteQueue( queueName ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/fc68e427/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java index 88d89de..3c09027 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java @@ -24,16 +24,13 @@ import com.datastax.driver.core.ProtocolVersion; import com.google.inject.Injector; import org.apache.commons.lang.RandomStringUtils; import org.apache.usergrid.persistence.qakka.AbstractTest; -import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException; -import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; import org.apache.usergrid.persistence.qakka.core.QakkaUtils; +import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard; import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization; import org.junit.Test; import java.io.*; import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; import java.util.UUID; import static org.junit.Assert.assertEquals; http://git-wip-us.apache.org/repos/asf/usergrid/blob/fc68e427/stack/corepersistence/queue/src/test/resources/qakka.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties index 464b48d..0c8e686 100644 --- a/stack/corepersistence/queue/src/test/resources/qakka.properties +++ b/stack/corepersistence/queue/src/test/resources/qakka.properties @@ -36,6 +36,8 @@ usergrid.cluster.seeds=us-east:localhost # Port used for cluster communications. usergrid.cluster.port=3545 +queue.inmemory.cache=false + queue.num.actors=50 queue.sender.num.actors=100 queue.writer.num.actors=100 http://git-wip-us.apache.org/repos/asf/usergrid/blob/fc68e427/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java index 16e77d0..6345687 100644 --- a/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java +++ b/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java @@ -191,7 +191,7 @@ public class QueueSystemResource extends AbstractContextResource { @PathParam("queueName") String queueName, @QueryParam("callback") @DefaultValue("callback") String callback ) { - logger.debug("DMJ_TEMP clearQueue"); + logger.debug("clearQueue"); QueueManager queueManager = injector.getInstance( QueueManager.class ); QueueMessageManager queueMessageManager = injector.getInstance( QueueMessageManagerImpl.class );
