Fix QueueRefresher & concurrency problem in MessageCounterSerialization.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7f3b2dae
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7f3b2dae
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7f3b2dae

Branch: refs/heads/master
Commit: 7f3b2dae42d47c0422c16def1b6f2518bfc82e57
Parents: 5d37ed5
Author: Dave Johnson <[email protected]>
Authored: Mon Oct 10 15:32:33 2016 -0400
Committer: Dave Johnson <[email protected]>
Committed: Mon Oct 10 15:32:33 2016 -0400

----------------------------------------------------------------------
 .../distributed/actors/QueueRefresher.java      | 14 +++++------
 .../impl/MessageCounterSerializationImpl.java   | 16 ++++++++-----
 .../serialization/sharding/ShardIterator.java   | 25 +++++++++++++-------
 .../impl/ShardCounterSerializationImpl.java     |  3 ++-
 .../distributed/actors/ShardAllocatorTest.java  | 14 ++++++-----
 .../queue/src/test/resources/qakka.properties   |  4 ++--
 6 files changed, 44 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/7f3b2dae/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
index afd5640..d8faeb2 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
@@ -107,15 +107,17 @@ public class QueueRefresher extends UntypedActor {
 
                 ShardIterator shardIterator = new ShardIterator(
                     cassandraClient, queueName, 
actorSystemFig.getRegionLocal(),
-                    Shard.Type.DEFAULT, Optional.empty() );
+                    Shard.Type.DEFAULT, shardIdOptional );
 
                 UUID since = inMemoryQueue.getNewest( queueName );
 
                 String region = actorSystemFig.getRegionLocal();
+
                 MultiShardMessageIterator multiShardIterator = new 
MultiShardMessageIterator(
                     cassandraClient, queueName, region, 
DatabaseQueueMessage.Type.DEFAULT,
                     shardIterator, since);
 
+
                 int need = qakkaFig.getQueueInMemorySize() - 
inMemoryQueue.size( queueName );
                 int count = 0;
 
@@ -125,14 +127,10 @@ public class QueueRefresher extends UntypedActor {
                     count++;
                 }
 
-                if ( multiShardIterator.getCurrentShard() != null ) {
-                    startingShards.put( shardKey, 
multiShardIterator.getCurrentShard().getShardId() );
-                }
+                startingShards.put( shardKey, shardId );
 
-                if ( count > 0 ) {
-                    logger.debug( "Added {} in-memory for queue {}, new size = 
{}",
-                        count, queueName, inMemoryQueue.size( queueName ) );
-                }
+//                logger.debug("Refreshed queue {} region {} shard {} since {} 
found {}",
+//                    queueName, region, shardId, since, count );
             }
 
         } finally {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7f3b2dae/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
index 0fdb47e..2eb482a 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
@@ -134,11 +134,13 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
                     inMemoryCounters.put( key, new InMemoryCount( value ));
                 }
             }
-        }
 
-        InMemoryCount inMemoryCount = inMemoryCounters.get( key );
-        inMemoryCount.getIncrement().addAndGet( increment );
+            InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+            inMemoryCount.getIncrement().addAndGet( increment );
 
+//            logger.info("Incremented Count for queue {} type {} = {}",
+//                queueName, type, getCounterValue( queueName, type ));
+        }
         saveIfNeeded( queueName, type );
     }
 
@@ -161,11 +163,13 @@ public class MessageCounterSerializationImpl implements 
MessageCounterSerializat
                     inMemoryCounters.put( key, new InMemoryCount( value ));
                 }
             }
-        }
 
-        InMemoryCount inMemoryCount = inMemoryCounters.get( key );
-        inMemoryCount.getDecrement().addAndGet( decrement );
+            InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+            inMemoryCount.getDecrement().addAndGet( decrement );
 
+//            logger.info("Decremented Count for queue {} type {} = {}",
+//                queueName, type, getCounterValue( queueName, type ));
+        }
         saveIfNeeded( queueName, type );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7f3b2dae/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java
index 402d429..5f46a7e 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java
@@ -40,9 +40,10 @@ public class ShardIterator implements Iterator<Shard> {
     private final String queueName;
     private final String region;
     private final Shard.Type shardType;
-    private final Optional<Long> shardId;
+    private final Optional<Long> lastShardId;
 
     private Iterator<Shard> currentIterator;
+    private long currentShardId = 0L;
 
     private long nextStart = 0L;
 
@@ -57,8 +58,10 @@ public class ShardIterator implements Iterator<Shard> {
         this.queueName = queueName;
         this.region = region;
         this.shardType = shardtype;
-        this.shardId = lastShardId.isPresent() ? lastShardId : Optional.of(0L);
+        this.lastShardId = lastShardId.isPresent() ? lastShardId : 
Optional.of(0L);
         this.cassandraClient = cassandraClient;
+
+        this.currentShardId = this.lastShardId.get();
     }
 
     @Override
@@ -79,7 +82,9 @@ public class ShardIterator implements Iterator<Shard> {
             throw new NoSuchElementException( "No next shard exists" );
         }
 
-        return currentIterator.next();
+        Shard next = currentIterator.next();
+        currentShardId = next.getShardId();
+        return next;
 
     }
 
@@ -90,13 +95,15 @@ public class ShardIterator implements Iterator<Shard> {
         Clause regionClause = QueryBuilder.eq( 
ShardSerializationImpl.COLUMN_REGION, region);
         Clause activeClause = QueryBuilder.eq( 
ShardSerializationImpl.COLUMN_ACTIVE, 1);
         Clause shardIdClause;
-        if(nextStart == 0L && shardId.isPresent()){
-            shardIdClause = QueryBuilder.gt( 
ShardSerializationImpl.COLUMN_SHARD_ID, shardId.get());
-        }else if( nextStart == 0L && !shardId.isPresent()){
-            shardIdClause = QueryBuilder.gte( 
ShardSerializationImpl.COLUMN_SHARD_ID, 0L);
 
-        }else{
-            shardIdClause = QueryBuilder.gt( 
ShardSerializationImpl.COLUMN_SHARD_ID, nextStart);
+        if (nextStart == 0L && lastShardId.isPresent()) {
+            shardIdClause = QueryBuilder.gt( 
ShardSerializationImpl.COLUMN_SHARD_ID, lastShardId.get() );
+
+        } else if (nextStart == 0L && !lastShardId.isPresent()) {
+            shardIdClause = QueryBuilder.gte( 
ShardSerializationImpl.COLUMN_SHARD_ID, 0L );
+
+        } else {
+            shardIdClause = QueryBuilder.gt( 
ShardSerializationImpl.COLUMN_SHARD_ID, nextStart );
         }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7f3b2dae/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
index bcfb74d..f303f43 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
@@ -91,7 +91,8 @@ public class ShardCounterSerializationImpl implements 
ShardCounterSerialization
 
 
     @Inject
-    public ShardCounterSerializationImpl( CassandraConfig cassandraConfig, 
QakkaFig qakkaFig, CassandraClient cassandraClient ) {
+    public ShardCounterSerializationImpl(
+        CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient 
cassandraClient ) {
         this.cassandraConfig = cassandraConfig;
         this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter();
         this.cassandraClient = cassandraClient;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7f3b2dae/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
index 919673c..aae5f44 100644
--- 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
@@ -173,12 +173,14 @@ public class ShardAllocatorTest extends AbstractTest {
 
         injector.getInstance( App.class ); // init the INJECTOR
 
+        QakkaFig            qakkaFig              = injector.getInstance( 
QakkaFig.class );
         ActorSystemFig      actorSystemFig        = injector.getInstance( 
ActorSystemFig.class );
         QueueManager        queueManager          = injector.getInstance( 
QueueManager.class );
         QueueMessageManager queueMessageManager   = injector.getInstance( 
QueueMessageManager.class );
         DistributedQueueService distributedQueueService = 
injector.getInstance( DistributedQueueService.class );
         ShardCounterSerialization shardCounterSer = injector.getInstance( 
ShardCounterSerialization.class );
 
+        Assert.assertEquals( "test assumes 'queue.shard.max.size' is 15 ", 15, 
qakkaFig.getMaxShardSize() );
 
         String region = actorSystemFig.getRegionLocal();
         App app = injector.getInstance( App.class );
@@ -191,9 +193,9 @@ public class ShardAllocatorTest extends AbstractTest {
 
         try {
 
-            // Create 4000 messages
+            // Create number of messages
 
-            int numMessages = 4000;
+            int numMessages = 400;
 
             for (int i = 0; i < numMessages; i++) {
                 queueMessageManager.sendMessages(
@@ -208,10 +210,10 @@ public class ShardAllocatorTest extends AbstractTest {
 
             distributedQueueService.refresh();
 
-            // Test that 8 shards were created
-
-            Assert.assertTrue( "num shards >= 7",
-                countShards( cassandraClient, shardCounterSer, queueName, 
region, Shard.Type.DEFAULT ) >= 7 );
+            // Test that right number of shards created
+            int shardCount = countShards( cassandraClient, shardCounterSer, 
queueName, region, Shard.Type.DEFAULT );
+            Assert.assertTrue( "shards > 10", shardCount > 10 );
+            Assert.assertTrue( "shards < 20", shardCount < 20 );
 
         } finally {
             queueManager.deleteQueue( queueName );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7f3b2dae/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties 
b/stack/corepersistence/queue/src/test/resources/qakka.properties
index ef1be80..142138d 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -34,14 +34,14 @@ usergrid.cluster.region.local=us-east
 usergrid.cluster.seeds=us-east:localhost
 
 # Port used for cluster communications.
-usergrid.cluster.port=2551
+usergrid.cluster.port=3545
 
 queue.num.actors=50
 queue.sender.num.actors=100
 queue.writer.num.actors=100
 
 # set shard size and times low for testing purposes
-queue.shard.max.size=500
+queue.shard.max.size=10
 queue.shard.allocation.check.frequency.millis=100
 queue.shard.allocation.advance.time.millis=200
 

Reply via email to