Fix QueueRefresher & concurrency problem in MessageCounterSerialization.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7f3b2dae Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7f3b2dae Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7f3b2dae Branch: refs/heads/master Commit: 7f3b2dae42d47c0422c16def1b6f2518bfc82e57 Parents: 5d37ed5 Author: Dave Johnson <[email protected]> Authored: Mon Oct 10 15:32:33 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Mon Oct 10 15:32:33 2016 -0400 ---------------------------------------------------------------------- .../distributed/actors/QueueRefresher.java | 14 +++++------ .../impl/MessageCounterSerializationImpl.java | 16 ++++++++----- .../serialization/sharding/ShardIterator.java | 25 +++++++++++++------- .../impl/ShardCounterSerializationImpl.java | 3 ++- .../distributed/actors/ShardAllocatorTest.java | 14 ++++++----- .../queue/src/test/resources/qakka.properties | 4 ++-- 6 files changed, 44 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/7f3b2dae/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java index afd5640..d8faeb2 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java @@ -107,15 +107,17 @@ public class QueueRefresher extends UntypedActor { ShardIterator shardIterator = new ShardIterator( cassandraClient, queueName, actorSystemFig.getRegionLocal(), - Shard.Type.DEFAULT, Optional.empty() ); + Shard.Type.DEFAULT, shardIdOptional ); UUID since = inMemoryQueue.getNewest( queueName ); String region = actorSystemFig.getRegionLocal(); + MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator( cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT, shardIterator, since); + int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName ); int count = 0; @@ -125,14 +127,10 @@ public class QueueRefresher extends UntypedActor { count++; } - if ( multiShardIterator.getCurrentShard() != null ) { - startingShards.put( shardKey, multiShardIterator.getCurrentShard().getShardId() ); - } + startingShards.put( shardKey, shardId ); - if ( count > 0 ) { - logger.debug( "Added {} in-memory for queue {}, new size = {}", - count, queueName, inMemoryQueue.size( queueName ) ); - } +// logger.debug("Refreshed queue {} region {} shard {} since {} found {}", +// queueName, region, shardId, since, count ); } } finally { http://git-wip-us.apache.org/repos/asf/usergrid/blob/7f3b2dae/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java index 0fdb47e..2eb482a 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java @@ -134,11 +134,13 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat inMemoryCounters.put( key, new InMemoryCount( value )); } } - } - InMemoryCount inMemoryCount = inMemoryCounters.get( key ); - inMemoryCount.getIncrement().addAndGet( increment ); + InMemoryCount inMemoryCount = inMemoryCounters.get( key ); + inMemoryCount.getIncrement().addAndGet( increment ); +// logger.info("Incremented Count for queue {} type {} = {}", +// queueName, type, getCounterValue( queueName, type )); + } saveIfNeeded( queueName, type ); } @@ -161,11 +163,13 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat inMemoryCounters.put( key, new InMemoryCount( value )); } } - } - InMemoryCount inMemoryCount = inMemoryCounters.get( key ); - inMemoryCount.getDecrement().addAndGet( decrement ); + InMemoryCount inMemoryCount = inMemoryCounters.get( key ); + inMemoryCount.getDecrement().addAndGet( decrement ); +// logger.info("Decremented Count for queue {} type {} = {}", +// queueName, type, getCounterValue( queueName, type )); + } saveIfNeeded( queueName, type ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7f3b2dae/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java index 402d429..5f46a7e 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java @@ -40,9 +40,10 @@ public class ShardIterator implements Iterator<Shard> { private final String queueName; private final String region; private final Shard.Type shardType; - private final Optional<Long> shardId; + private final Optional<Long> lastShardId; private Iterator<Shard> currentIterator; + private long currentShardId = 0L; private long nextStart = 0L; @@ -57,8 +58,10 @@ public class ShardIterator implements Iterator<Shard> { this.queueName = queueName; this.region = region; this.shardType = shardtype; - this.shardId = lastShardId.isPresent() ? lastShardId : Optional.of(0L); + this.lastShardId = lastShardId.isPresent() ? lastShardId : Optional.of(0L); this.cassandraClient = cassandraClient; + + this.currentShardId = this.lastShardId.get(); } @Override @@ -79,7 +82,9 @@ public class ShardIterator implements Iterator<Shard> { throw new NoSuchElementException( "No next shard exists" ); } - return currentIterator.next(); + Shard next = currentIterator.next(); + currentShardId = next.getShardId(); + return next; } @@ -90,13 +95,15 @@ public class ShardIterator implements Iterator<Shard> { Clause regionClause = QueryBuilder.eq( ShardSerializationImpl.COLUMN_REGION, region); Clause activeClause = QueryBuilder.eq( ShardSerializationImpl.COLUMN_ACTIVE, 1); Clause shardIdClause; - if(nextStart == 0L && shardId.isPresent()){ - shardIdClause = QueryBuilder.gt( ShardSerializationImpl.COLUMN_SHARD_ID, shardId.get()); - }else if( nextStart == 0L && !shardId.isPresent()){ - shardIdClause = QueryBuilder.gte( ShardSerializationImpl.COLUMN_SHARD_ID, 0L); - }else{ - shardIdClause = QueryBuilder.gt( ShardSerializationImpl.COLUMN_SHARD_ID, nextStart); + if (nextStart == 0L && lastShardId.isPresent()) { + shardIdClause = QueryBuilder.gt( ShardSerializationImpl.COLUMN_SHARD_ID, lastShardId.get() ); + + } else if (nextStart == 0L && !lastShardId.isPresent()) { + shardIdClause = QueryBuilder.gte( ShardSerializationImpl.COLUMN_SHARD_ID, 0L ); + + } else { + shardIdClause = QueryBuilder.gt( ShardSerializationImpl.COLUMN_SHARD_ID, nextStart ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/7f3b2dae/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java index bcfb74d..f303f43 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java @@ -91,7 +91,8 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization @Inject - public ShardCounterSerializationImpl( CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) { + public ShardCounterSerializationImpl( + CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) { this.cassandraConfig = cassandraConfig; this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter(); this.cassandraClient = cassandraClient; http://git-wip-us.apache.org/repos/asf/usergrid/blob/7f3b2dae/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java index 919673c..aae5f44 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java @@ -173,12 +173,14 @@ public class ShardAllocatorTest extends AbstractTest { injector.getInstance( App.class ); // init the INJECTOR + QakkaFig qakkaFig = injector.getInstance( QakkaFig.class ); ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class ); QueueManager queueManager = injector.getInstance( QueueManager.class ); QueueMessageManager queueMessageManager = injector.getInstance( QueueMessageManager.class ); DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class ); ShardCounterSerialization shardCounterSer = injector.getInstance( ShardCounterSerialization.class ); + Assert.assertEquals( "test assumes 'queue.shard.max.size' is 15 ", 15, qakkaFig.getMaxShardSize() ); String region = actorSystemFig.getRegionLocal(); App app = injector.getInstance( App.class ); @@ -191,9 +193,9 @@ public class ShardAllocatorTest extends AbstractTest { try { - // Create 4000 messages + // Create number of messages - int numMessages = 4000; + int numMessages = 400; for (int i = 0; i < numMessages; i++) { queueMessageManager.sendMessages( @@ -208,10 +210,10 @@ public class ShardAllocatorTest extends AbstractTest { distributedQueueService.refresh(); - // Test that 8 shards were created - - Assert.assertTrue( "num shards >= 7", - countShards( cassandraClient, shardCounterSer, queueName, region, Shard.Type.DEFAULT ) >= 7 ); + // Test that right number of shards created + int shardCount = countShards( cassandraClient, shardCounterSer, queueName, region, Shard.Type.DEFAULT ); + Assert.assertTrue( "shards > 10", shardCount > 10 ); + Assert.assertTrue( "shards < 20", shardCount < 20 ); } finally { queueManager.deleteQueue( queueName ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/7f3b2dae/stack/corepersistence/queue/src/test/resources/qakka.properties ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties index ef1be80..142138d 100644 --- a/stack/corepersistence/queue/src/test/resources/qakka.properties +++ b/stack/corepersistence/queue/src/test/resources/qakka.properties @@ -34,14 +34,14 @@ usergrid.cluster.region.local=us-east usergrid.cluster.seeds=us-east:localhost # Port used for cluster communications. -usergrid.cluster.port=2551 +usergrid.cluster.port=3545 queue.num.actors=50 queue.sender.num.actors=100 queue.writer.num.actors=100 # set shard size and times low for testing purposes -queue.shard.max.size=500 +queue.shard.max.size=10 queue.shard.allocation.check.frequency.millis=100 queue.shard.allocation.advance.time.millis=200
