Repository: usergrid Updated Branches: refs/heads/usergrid-1318-queue 9f2863fd6 -> 775257d27
Another counter concurrency fix, plus test stabilization changes Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/041109fb Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/041109fb Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/041109fb Branch: refs/heads/usergrid-1318-queue Commit: 041109fb16287d1e6194f29658d7445c8058fc90 Parents: 9f2863f Author: Dave Johnson <snoopd...@apache.org> Authored: Tue Oct 11 08:55:12 2016 -0400 Committer: Dave Johnson <snoopd...@apache.org> Committed: Tue Oct 11 08:55:12 2016 -0400 ---------------------------------------------------------------------- .../impl/MessageCounterSerializationImpl.java | 81 ++++++++++---------- .../distributed/QueueActorServiceTest.java | 17 ++-- .../queue/src/test/resources/qakka.properties | 3 + 3 files changed, 53 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/041109fb/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java index 2eb482a..ee4bab2 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java @@ -123,25 +123,29 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat synchronized ( inMemoryCounters ) { - if ( inMemoryCounters.get( key ) == null ) { + if (inMemoryCounters.get( key ) == null) { Long value = retrieveCounterFromStorage( queueName, type ); - if ( value == null ) { + if (value == null) { incrementCounterInStorage( queueName, type, 0L ); - inMemoryCounters.put( key, new InMemoryCount( 0L )); + inMemoryCounters.put( key, new InMemoryCount( 0L ) ); } else { - inMemoryCounters.put( key, new InMemoryCount( value )); + inMemoryCounters.put( key, new InMemoryCount( value ) ); } } + } + + InMemoryCount inMemoryCount = inMemoryCounters.get( key ); - InMemoryCount inMemoryCount = inMemoryCounters.get( key ); + synchronized ( inMemoryCount ) { inMemoryCount.getIncrement().addAndGet( increment ); -// logger.info("Incremented Count for queue {} type {} = {}", -// queueName, type, getCounterValue( queueName, type )); + //logger.info("Incremented Count for queue {} type {} = {}", + //queueName, type, getCounterValue( queueName, type )); + + saveIfNeeded( queueName, type ); } - saveIfNeeded( queueName, type ); } @@ -152,25 +156,30 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat synchronized ( inMemoryCounters ) { - if ( inMemoryCounters.get( key ) == null ) { + if (inMemoryCounters.get( key ) == null) { Long value = retrieveCounterFromStorage( queueName, type ); - if ( value == null ) { + if (value == null) { decrementCounterInStorage( queueName, type, 0L ); - inMemoryCounters.put( key, new InMemoryCount( 0L )); + inMemoryCounters.put( key, new InMemoryCount( 0L ) ); } else { - inMemoryCounters.put( key, new InMemoryCount( value )); + inMemoryCounters.put( key, new InMemoryCount( value ) ); } } + } + + InMemoryCount inMemoryCount = inMemoryCounters.get( key ); + + synchronized ( inMemoryCount ) { - InMemoryCount inMemoryCount = inMemoryCounters.get( key ); inMemoryCount.getDecrement().addAndGet( decrement ); -// logger.info("Decremented Count for queue {} type {} = {}", -// queueName, type, getCounterValue( queueName, type )); + //logger.info("Decremented Count for queue {} type {} = {}", + //queueName, type, getCounterValue( queueName, type )); + + saveIfNeeded( queueName, type ); } - saveIfNeeded( queueName, type ); } @@ -179,19 +188,16 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat String key = buildKey( queueName, type ); - synchronized ( inMemoryCounters ) { + if ( inMemoryCounters.get( key ) == null ) { - if ( inMemoryCounters.get( key ) == null ) { + Long value = retrieveCounterFromStorage( queueName, type ); - Long value = retrieveCounterFromStorage( queueName, type ); - - if ( value == null ) { - throw new NotFoundException( - MessageFormat.format( "No counter found for queue {0} type {1}", - queueName, type )); - } else { - inMemoryCounters.put( key, new InMemoryCount( value )); - } + if ( value == null ) { + throw new NotFoundException( + MessageFormat.format( "No counter found for queue {0} type {1}", + queueName, type )); + } else { + inMemoryCounters.put( key, new InMemoryCount( value )); } } @@ -245,22 +251,19 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat InMemoryCount inMemoryCount = inMemoryCounters.get( key ); - synchronized ( inMemoryCount ) { - - if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) { + if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) { - long totalIncrement = inMemoryCount.getIncrement().get(); - incrementCounterInStorage( queueName, type, totalIncrement ); + long totalIncrement = inMemoryCount.getIncrement().get(); + incrementCounterInStorage( queueName, type, totalIncrement ); - long totalDecrement = inMemoryCount.getDecrement().get(); - decrementCounterInStorage( queueName, type, totalDecrement ); + long totalDecrement = inMemoryCount.getDecrement().get(); + decrementCounterInStorage( queueName, type, totalDecrement ); - inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type ) ); - inMemoryCount.getIncrement().set( 0L ); - inMemoryCount.getDecrement().set( 0L ); + inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type ) ); + inMemoryCount.getIncrement().set( 0L ); + inMemoryCount.getDecrement().set( 0L ); - numChanges.set( 0 ); - } + numChanges.set( 0 ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/041109fb/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java index 7fe8b16..f5512e5 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java @@ -152,32 +152,31 @@ public class QueueActorServiceTest extends AbstractTest { queueName, region, region, messageId, null, null ); } - int maxRetries = 25; + int maxRetries = 10; int retries = 0; - int count = 0; + long count = 0; while (retries++ < maxRetries) { distributedQueueService.refresh(); - if ( queueMessageManager.getQueueDepth( queueName ) == 100 ) { - count = 100; + count = queueMessageManager.getQueueDepth( queueName ); + if ( count == 100 ) { break; } - count = inMemoryQueue.size( queueName ); Thread.sleep( 1000 ); } Assert.assertEquals( 100, count ); Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); - Assert.assertEquals( 75, inMemoryQueue.size( queueName ) ); + Assert.assertEquals( 75, queueMessageManager.getQueueDepth( queueName ) ); Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); - Assert.assertEquals( 50, inMemoryQueue.size( queueName ) ); + Assert.assertEquals( 50, queueMessageManager.getQueueDepth( queueName ) ); Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); - Assert.assertEquals( 25, inMemoryQueue.size( queueName ) ); + Assert.assertEquals( 25, queueMessageManager.getQueueDepth( queueName ) ); Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); - Assert.assertEquals( 0, inMemoryQueue.size( queueName ) ); + Assert.assertEquals( 0, queueMessageManager.getQueueDepth( queueName ) ); distributedQueueService.shutdown(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/041109fb/stack/corepersistence/queue/src/test/resources/qakka.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties index 142138d..95b2509 100644 --- a/stack/corepersistence/queue/src/test/resources/qakka.properties +++ b/stack/corepersistence/queue/src/test/resources/qakka.properties @@ -40,6 +40,9 @@ queue.num.actors=50 queue.sender.num.actors=100 queue.writer.num.actors=100 +queue.send.timeout.seconds=5 +queue.get.timeout.seconds=5 + # set shard size and times low for testing purposes queue.shard.max.size=10 queue.shard.allocation.check.frequency.millis=100