Switch DISTRIBUTED database queueing to default not cache in memory as the in memory impl causes duplicate messgae processing quite often at the moment.
- includes making all the tests work without in-memory queue fronting the database queue which really means adding some more delay in tests - the tests now are actually faster now because the original refreshIndex() created and queried random entities which took longer in most cases - uncommented the checkReceipts function in Notification tests as these are now passing, with an added fix for duplicate receipt creation - some logging updates in the distributed database queueing impl (Qakka) - increased the default take to 500 from the queue when DISTRIBUTED database queueing is configured ( which is the default now ) - Notifications Queue Listener thread names have a random identifier in included - reduced the DISTRIBUTED database queueing default long poll to 1 second from 5 seconds Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d3e988bc Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d3e988bc Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d3e988bc Branch: refs/heads/master Commit: d3e988bcbb7eb417c84cfda7396ec3506521aa37 Parents: 8b63aae Author: Michael Russo <[email protected]> Authored: Sun Apr 2 16:14:05 2017 -0700 Committer: Michael Russo <[email protected]> Committed: Sun Apr 2 16:14:05 2017 -0700 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 44 +--------- .../asyncevents/AsyncIndexProvider.java | 5 ++ .../java/org/apache/usergrid/Application.java | 4 +- .../org/apache/usergrid/CoreApplication.java | 22 ++--- .../org/apache/usergrid/CoreITSetupImpl.java | 20 ++++- .../org/apache/usergrid/TestEntityIndex.java | 1 + .../corepersistence/AggregationServiceTest.java | 15 ++-- .../corepersistence/StaleIndexCleanupTest.java | 17 ++-- .../persistence/ApplicationServiceIT.java | 6 +- .../usergrid/persistence/CollectionIT.java | 74 ++++++++-------- .../usergrid/persistence/CountingMutatorIT.java | 4 +- .../persistence/EntityConnectionsIT.java | 16 ++-- .../usergrid/persistence/EntityManagerIT.java | 20 ++--- .../org/apache/usergrid/persistence/GeoIT.java | 32 +++---- .../persistence/GeoQueryBooleanTest.java | 4 +- .../apache/usergrid/persistence/IndexIT.java | 18 ++-- .../usergrid/persistence/PathQueryIT.java | 7 +- .../usergrid/persistence/PermissionsIT.java | 6 +- .../usergrid/persistence/RebuildIndexTest.java | 26 +++--- .../cassandra/EntityManagerFactoryImplIT.java | 13 +-- .../persistence/query/ConnectionHelper.java | 2 +- .../query/IntersectionTransitivePagingIT.java | 2 +- .../query/IntersectionUnionPagingIT.java | 2 +- .../persistence/query/IteratingQueryIT.java | 32 +++---- .../persistence/query/NotSubPropertyIT.java | 2 +- .../persistence/query/ParenthesisProblemIT.java | 2 +- .../resources/usergrid-custom-test.properties | 3 + .../usergrid/persistence/qakka/QakkaFig.java | 4 +- .../qakka/core/impl/InMemoryQueue.java | 4 +- .../core/impl/QueueMessageManagerImpl.java | 9 +- .../distributed/actors/QueueActorHelper.java | 15 +++- .../distributed/actors/QueueActorRouter.java | 2 +- .../distributed/actors/ShardAllocator.java | 4 +- .../impl/DistributedQueueServiceImpl.java | 26 ++++-- .../distributed/actors/ShardAllocatorTest.java | 3 + .../queue/src/test/resources/qakka.properties | 5 +- .../usergrid/rest/CollectionMetadataIT.java | 4 +- .../apache/usergrid/rest/NotificationsIT.java | 14 ++- .../apache/usergrid/rest/PartialUpdateTest.java | 6 +- .../apache/usergrid/rest/SystemResourceIT.java | 3 +- .../rest/applications/ApplicationCreateIT.java | 2 +- .../rest/applications/ApplicationDeleteIT.java | 6 +- .../applications/ApplicationResourceIT.java | 12 +-- .../applications/assets/AssetResourceIT.java | 26 +++--- .../applications/assets/AwsAssetResourceIT.java | 22 ++--- .../collection/BrowserCompatibilityTest.java | 2 +- .../collection/CollectionsResourceIT.java | 70 +++++++-------- .../collection/DuplicateNameIT.java | 2 +- .../activities/ActivityResourceIT.java | 8 +- .../collection/activities/PutTest.java | 6 +- .../collection/devices/DevicesResourceIT.java | 12 +-- .../collection/groups/GroupResourceIT.java | 39 +++++---- .../collection/paging/PagingResourceIT.java | 10 +-- .../users/ConnectionResourceTest.java | 22 ++--- .../collection/users/OwnershipResourceIT.java | 24 +++--- .../collection/users/PermissionsResourceIT.java | 58 ++++++------- .../collection/users/RetrieveUsersTest.java | 8 +- .../collection/users/UserResourceIT.java | 89 ++++++++++---------- .../applications/events/EventsResourceIT.java | 6 +- .../applications/queries/BasicGeoTests.java | 4 +- .../applications/queries/GeoPagingTest.java | 13 +-- .../applications/queries/MatrixQueryTests.java | 5 +- .../rest/applications/queries/OrderByTest.java | 6 +- .../applications/queries/QueryTestBase.java | 2 +- .../queries/SelectMappingsQueryTest.java | 14 +-- .../usergrid/rest/management/AccessTokenIT.java | 8 +- .../usergrid/rest/management/AdminUsersIT.java | 33 ++++---- .../rest/management/ExportResourceIT.java | 2 +- .../rest/management/ImportResourceIT.java | 14 +-- .../rest/management/ManagementResourceIT.java | 18 ++-- .../rest/management/OrganizationsIT.java | 16 ++-- .../rest/management/RegistrationIT.java | 4 +- .../rest/test/resource/AbstractRestIT.java | 12 ++- .../resources/usergrid-custom-test.properties | 7 +- .../services/notifications/QueueListener.java | 15 ++-- .../services/notifications/gcm/GCMAdapter.java | 3 +- .../org/apache/usergrid/ServiceApplication.java | 2 +- .../apache/usergrid/management/EmailFlowIT.java | 2 +- .../org/apache/usergrid/management/RoleIT.java | 5 +- .../usergrid/services/CollectionServiceIT.java | 2 +- .../usergrid/services/GroupServiceIT.java | 5 +- .../usergrid/services/ServiceInvocationIT.java | 6 +- .../AbstractServiceNotificationIT.java | 11 +-- .../apns/NotificationsServiceIT.java | 40 +++++---- .../gcm/NotificationsServiceIT.java | 15 ++-- .../resources/usergrid-custom-test.properties | 15 ++-- 86 files changed, 605 insertions(+), 596 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 1071842..cdb4fc7 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -3089,52 +3089,16 @@ public class CpEntityManager implements EntityManager { managerCache.getEntityIndex(applicationScope).addIndex(newIndexName, shards, replicas, writeConsistency); } + @Override public void initializeIndex(){ managerCache.getEntityIndex(applicationScope).initialize(); } - /** - * TODO, these 3 methods are super janky. During refactoring we should clean this model up - */ - public EntityIndex.IndexRefreshCommandInfo refreshIndex() { - try { - long start = System.currentTimeMillis(); - // refresh special indexes without calling EntityManager refresh because stack overflow - Map<String, Object> map = new org.apache.usergrid.persistence.index.utils.MapUtils.HashMapBuilder<>(); - map.put("some prop", "test"); - boolean hasFinished = false; - Entity refreshEntity = create("refresh", map); - EntityIndex.IndexRefreshCommandInfo indexRefreshCommandInfo - = managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first(); - try { - for (int i = 0; i < 20; i++) { - if (searchCollection( - new SimpleEntityRef( - org.apache.usergrid.persistence.entities.Application.ENTITY_TYPE, getApplicationId()), - InflectionUtils.pluralize("refresh"), - Query.fromQL("select * where uuid='" + refreshEntity.getUuid() + "'") - ).size() > 0 - ) { - hasFinished = true; - break; - } - int sleepTime = 500; - logger.info("Sleeping {} ms during refreshIndex", sleepTime); - Thread.sleep(sleepTime); - indexRefreshCommandInfo - = managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first(); - } - if(!hasFinished){ - throw new RuntimeException("Did not find entity {} during refresh. uuid->"+refreshEntity.getUuid()); - } - }finally { - delete(refreshEntity); - } - Thread.sleep(100); - - return indexRefreshCommandInfo; + public EntityIndex.IndexRefreshCommandInfo refreshIndex() { + try { + return managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first(); } catch (Exception e) { throw new RuntimeException("refresh failed",e); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java index 81960f5..2ba6c0b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java @@ -36,6 +36,7 @@ import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; +import static org.apache.usergrid.persistence.queue.LegacyQueueManager.Implementation.DISTRIBUTED; import static org.apache.usergrid.persistence.queue.LegacyQueueManager.Implementation.LOCAL; @@ -121,6 +122,10 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { asyncEventService.MAX_TAKE = 1000; } + if ( impl.equals( DISTRIBUTED )) { + asyncEventService.MAX_TAKE = 500; + } + return asyncEventService; } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/Application.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/Application.java b/stack/core/src/test/java/org/apache/usergrid/Application.java index 378a4f7..102ee9c 100644 --- a/stack/core/src/test/java/org/apache/usergrid/Application.java +++ b/stack/core/src/test/java/org/apache/usergrid/Application.java @@ -152,7 +152,9 @@ public interface Application extends TestRule { public void remove( EntityRef entityRef ) throws Exception; - public void refreshIndex(); + public void waitForQueueDrainAndRefreshIndex(int waitTimeMillis); + + public void waitForQueueDrainAndRefreshIndex(); /** * Get the entity manager http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java index 9046f02..f505ead 100644 --- a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java +++ b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java @@ -181,8 +181,8 @@ public class CoreApplication implements Application, TestRule { logger.info( "Created new application {} in organization {}", appName, orgName ); -// //wait for the index before proceeding -// em.refreshIndex(); + //wait for the index before proceeding + waitForQueueDrainAndRefreshIndex(500); } @@ -223,19 +223,21 @@ public class CoreApplication implements Application, TestRule { return em.get( new SimpleEntityRef( type, id ) ); } - @Override - public synchronized void refreshIndex() { - //Insert test entity and find it - setup.getEmf().refreshIndex(CpNamingUtils.getManagementApplicationId().getUuid()); - - if (!em.getApplicationId().equals(CpNamingUtils.getManagementApplicationId().getUuid())) { - setup.getEmf().refreshIndex(em.getApplicationId()); + public synchronized void waitForQueueDrainAndRefreshIndex(int waitTimeMillis) { + try{ + Thread.sleep(waitTimeMillis); + } catch (InterruptedException e ){ + //noop } - em.refreshIndex(); } + @Override + public synchronized void waitForQueueDrainAndRefreshIndex() { + waitForQueueDrainAndRefreshIndex(750); + } + @Override public EntityManager getEntityManager() { http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java b/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java index 64b001c..bd6ae3e 100644 --- a/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java +++ b/stack/core/src/test/java/org/apache/usergrid/CoreITSetupImpl.java @@ -156,16 +156,28 @@ public class CoreITSetupImpl implements CoreITSetup, TestEntityIndex { @Override public void refresh(UUID appId){ try { - Thread.sleep(50); - } catch (InterruptedException ie){ + Thread.sleep(125); + + } catch (InterruptedException ie){ + //noop } + emf.refreshIndex(appId); + } + + @Override + public void waitForQueueDrainAndRefresh(UUID appId, int waitTimeMillis){ try { - Thread.sleep(50); - } catch (InterruptedException ie){ + Thread.sleep(waitTimeMillis); + + } catch (InterruptedException ie){ + //noop } + + emf.refreshIndex(appId); + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java b/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java index 7da187a..e5e979e 100644 --- a/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java +++ b/stack/core/src/test/java/org/apache/usergrid/TestEntityIndex.java @@ -26,4 +26,5 @@ import java.util.UUID; */ public interface TestEntityIndex { void refresh(UUID appId); + void waitForQueueDrainAndRefresh(UUID appId, int waitTimeMillis); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java index 9f1c9a4..55ce26e 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/AggregationServiceTest.java @@ -48,8 +48,8 @@ public class AggregationServiceTest extends AbstractCoreIT { props.put("name", "myname"); Entity entity1 = this.app.getEntityManager().create("test", props); Entity entity2 = this.app.getEntityManager().create("test2", props); - this.app.refreshIndex(); - Thread.sleep(500); + + this.app.waitForQueueDrainAndRefreshIndex(500); long sum = aggregationService.getApplicationSize(applicationScope); @@ -57,23 +57,24 @@ public class AggregationServiceTest extends AbstractCoreIT { Assert.assertTrue(sum > (entity1.getSize() + entity2.getSize())); long sum1 = aggregationService.getSize(applicationScope, CpNamingUtils.createCollectionSearchEdge(applicationScope.getApplication(), "tests")); - Assert.assertEquals(sum1, entity1.getSize()); + Assert.assertEquals(entity1.getSize(), sum1); long sum2 = aggregationService.getSize(applicationScope, CpNamingUtils.createCollectionSearchEdge(applicationScope.getApplication(), "test2s")); - Assert.assertEquals(sum2, entity2.getSize()); + Assert.assertEquals(entity2.getSize(), sum2); props = new HashMap<>(); props.put("test", 1234); props.put("name", "myname2"); Entity entity3 = this.app.getEntityManager().create("test", props); - this.app.refreshIndex(); + this.app.waitForQueueDrainAndRefreshIndex(500); + long sum3 = aggregationService.getSize(applicationScope, CpNamingUtils.createCollectionSearchEdge(applicationScope.getApplication(), "tests")); - Assert.assertEquals(sum3, entity1.getSize() + entity3.getSize()); + Assert.assertEquals(entity1.getSize() + entity3.getSize(), sum3); Map<String,Long> sumEach = aggregationService.getEachCollectionSize(applicationScope); Assert.assertTrue(sumEach.containsKey("tests") && sumEach.containsKey("test2s")); - Assert.assertEquals(sum3, (long) sumEach.get("tests")); + Assert.assertEquals((long) sumEach.get("tests"), sum3); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java index abe2615..0abd7a2 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -105,7 +104,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { Entity thing = em.create("thing", new HashMap<String, Object>() {{ put("name", "thing1"); }}); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Thread.sleep(1000); assertEquals(1, queryCollectionCp("things", "thing", "select *").size()); @@ -116,7 +115,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { em.updateProperties(thing, new HashMap<String, Object>() {{ put("stuff", "widget"); }}); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Thread.sleep(1000); org.apache.usergrid.persistence.model.entity.Entity cpUpdated = getCpEntity(thing); @@ -161,7 +160,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { }})); Thread.sleep( writeDelayMs ); } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); CandidateResults crs = queryCollectionCp( "things", "thing", "select *"); Assert.assertEquals( "Expect no stale candidates yet", numEntities, crs.size() ); @@ -210,7 +209,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { Thread.sleep(250); // delete happens asynchronously, wait for some time //refresh the app index - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Thread.sleep(250); // refresh happens asynchronously, wait for some time @@ -231,7 +230,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { } }); //refresh the app index - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); crs = queryCollectionCp("things", "thing", "select *"); @@ -265,7 +264,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { put("name", dogName); }})); } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); CandidateResults crs = queryCollectionCp( "dogs", "dog", "select *"); Assert.assertEquals("Expect no stale candidates yet", numEntities, crs.size()); @@ -288,7 +287,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { } } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // wait for indexes to be cleared for the deleted entities count = 0; @@ -296,7 +295,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { do { //trigger the repair queryCollectionEm("dogs", "select * order by created"); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); crs = queryCollectionCp("dogs", "dog", "select *"); } while ( crs.size() != numEntities && count++ < 15 ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java index 9ad90eb..547691f 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java @@ -23,7 +23,6 @@ import com.google.common.base.Optional; import com.google.inject.Injector; import org.apache.usergrid.AbstractCoreIT; import org.apache.usergrid.cassandra.SpringResource; -import org.apache.usergrid.corepersistence.service.AggregationService; import org.apache.usergrid.corepersistence.service.AggregationServiceFactory; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -32,7 +31,6 @@ import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; -import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.model.entity.Id; import org.junit.Assert; @@ -65,7 +63,7 @@ public class ApplicationServiceIT extends AbstractCoreIT { map.put("somekey", UUID.randomUUID()); Entity entity = entityManager.create("tests", map); } - this.app.refreshIndex(); + this.app.waitForQueueDrainAndRefreshIndex(); Thread.sleep(500); ApplicationScope appScope = CpNamingUtils.getApplicationScope(entityManager.getApplicationId()); Observable<Id> ids = @@ -76,7 +74,7 @@ public class ApplicationServiceIT extends AbstractCoreIT { this.app.getApplicationService().deleteAllEntities(appScope, 5); count = ids.count().toBlocking().last(); Assert.assertEquals(count, 5); - this.app.refreshIndex(); + this.app.waitForQueueDrainAndRefreshIndex(); Injector injector = SpringResource.getInstance().getBean(Injector.class); GraphManagerFactory factory = injector.getInstance(GraphManagerFactory.class); GraphManager graphManager = factory.createEdgeManager(appScope); http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java index 3305e0e..f484f4f 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java @@ -132,7 +132,7 @@ public class CollectionIT extends AbstractCoreIT { activity3 = app.get( activity3.getUuid(), activity3.getType() ); app.addToCollection( user, "activities", activity3 ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // empty query Query query = new Query(); @@ -259,7 +259,7 @@ public class CollectionIT extends AbstractCoreIT { activity3 = app.get(activity3.getUuid(), activity3.getType()); app.addToCollection(user, "activities", activity3); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // empty query Query query = new Query(); @@ -295,7 +295,7 @@ public class CollectionIT extends AbstractCoreIT { Entity user = em.create( "user", properties ); assertNotNull( user ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // EntityRef Query query = Query.fromQL( "firstname = '" + firstName + "'" ); @@ -315,7 +315,7 @@ public class CollectionIT extends AbstractCoreIT { em.update( user ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // search with the old username, should be no results query = Query.fromQL( "firstname = '" + firstName + "'" ); @@ -354,7 +354,7 @@ public class CollectionIT extends AbstractCoreIT { Entity user = em.create( "user", properties ); assertNotNull( user ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // EntityRef final Query query = Query.fromQL( "middlename = '" + middleName + "'" ); @@ -386,7 +386,7 @@ public class CollectionIT extends AbstractCoreIT { Entity user = em.create( "user", properties ); assertNotNull( user ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // EntityRef final Query query = Query.fromQL( "lastname = '" + lastName + "'" ); @@ -434,7 +434,7 @@ public class CollectionIT extends AbstractCoreIT { properties.put("nickname", "ed"); em.updateProperties(user1, properties); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Thread.sleep(1000); final Query query = Query.fromQL( "nickname = 'ed'" ); @@ -469,7 +469,7 @@ public class CollectionIT extends AbstractCoreIT { Entity group = em.create( "group", properties ); assertNotNull( group ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // EntityRef final Query query = Query.fromQL( "name = '" + groupName + "'" ); @@ -501,7 +501,7 @@ public class CollectionIT extends AbstractCoreIT { Entity group = em.create( "group", properties ); assertNotNull( group ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // EntityRef @@ -559,7 +559,7 @@ public class CollectionIT extends AbstractCoreIT { em.addToCollection( user, "activities", em.create( "activity", properties ) ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); final Query query = Query.fromQL( "verb = 'post'" ); @@ -593,7 +593,7 @@ public class CollectionIT extends AbstractCoreIT { Entity user2 = em.create( "user", properties ); assertNotNull( user2 ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // EntityRef Query query = new Query(); @@ -636,7 +636,7 @@ public class CollectionIT extends AbstractCoreIT { Entity user2 = em.create( "user", properties ); assertNotNull( user2 ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // EntityRef Query query = new Query(); @@ -677,7 +677,7 @@ public class CollectionIT extends AbstractCoreIT { Entity game2 = em.create( "orquerygame", properties ); assertNotNull( game2 ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // EntityRef Query query = Query @@ -756,7 +756,7 @@ public class CollectionIT extends AbstractCoreIT { Entity game2 = em.create( "game", properties ); assertNotNull( game2 ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // overlap Query query = Query.fromQL( @@ -817,7 +817,7 @@ public class CollectionIT extends AbstractCoreIT { Entity game2 = em.create( "game", properties ); assertNotNull( game2 ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // simple not Query query = Query.fromQL( "select * where NOT keywords contains 'game'" ); @@ -893,7 +893,7 @@ public class CollectionIT extends AbstractCoreIT { Entity entity2 = em.create( "game", properties ); assertNotNull( entity2 ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // search for games without sub-field Foo should returned zero entities @@ -949,7 +949,7 @@ public class CollectionIT extends AbstractCoreIT { properties.put("keywords", "Action, New"); em.create( "game", properties ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Query query = Query.fromQL( "select * where keywords contains 'hot' or title contains 'hot'" ); Results r = em.searchCollection( em.getApplicationRef(), "games", query ); @@ -980,7 +980,7 @@ public class CollectionIT extends AbstractCoreIT { properties.put( "keywords", "Action, New" ); Entity thirdGame = em.create( "game", properties ); - app.refreshIndex();//need to track all batches then resolve promises + app.waitForQueueDrainAndRefreshIndex();//need to track all batches then resolve promises Query query = Query.fromQL( "select * where keywords contains 'new' and title contains 'extreme'" ); Results r = em.searchCollection( em.getApplicationRef(), "games", query ); @@ -1011,7 +1011,7 @@ public class CollectionIT extends AbstractCoreIT { entityIds.add( created.getUuid() ); } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Query query = new Query(); query.setLimit( 50 ); @@ -1039,7 +1039,7 @@ public class CollectionIT extends AbstractCoreIT { numDeleted++; } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // wait for indexes to be cleared Thread.sleep(1000); //TODO find why we have to wait. This is a bug @@ -1096,7 +1096,7 @@ public class CollectionIT extends AbstractCoreIT { int pageSize = 10; - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); final Query query = Query.fromQL( "index < " + size * 2 + " order by index asc" ); Results r = null; @@ -1147,7 +1147,7 @@ public class CollectionIT extends AbstractCoreIT { int pageSize = 10; - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Query query = Query.fromQL( "select * where index >= " + size / 2 + " sort by index asc" ); query.setLimit( pageSize ); @@ -1201,7 +1201,7 @@ public class CollectionIT extends AbstractCoreIT { entityIds.add( created.getUuid() ); } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); int pageSize = 10; @@ -1254,7 +1254,7 @@ public class CollectionIT extends AbstractCoreIT { entityIds.add( created.getUuid() ); } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); int pageSize = 5; @@ -1310,7 +1310,7 @@ public class CollectionIT extends AbstractCoreIT { Entity saved = em.create( "test", root ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Query query = Query.fromQL( "rootprop1 = 'simpleprop'" ); Entity entity; @@ -1357,7 +1357,7 @@ public class CollectionIT extends AbstractCoreIT { Entity saved = em.create( "test", jsonData ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Query query = Query.fromQL( "intprop = 10" ); @@ -1416,7 +1416,7 @@ public class CollectionIT extends AbstractCoreIT { Entity saved = em.create( "test", props ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Query query = Query.fromQL( "myString = 'My simple string'" ); @@ -1441,7 +1441,7 @@ public class CollectionIT extends AbstractCoreIT { em.create( "user", properties ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); String s = "select username, email where username = 'edanuff'"; Query query = Query.fromQL( s ); @@ -1471,7 +1471,7 @@ public class CollectionIT extends AbstractCoreIT { em.create( "user", properties ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); String s = "select {name: username, email: email} where username = 'edanuff'"; Query query = Query.fromQL( s ); @@ -1503,7 +1503,7 @@ public class CollectionIT extends AbstractCoreIT { final Entity entity = em.create( "user", properties ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); String s = "select * where username = '[email protected]'"; Query query = Query.fromQL( s ); @@ -1525,7 +1525,7 @@ public class CollectionIT extends AbstractCoreIT { em.createConnection( foo, "testconnection", entity ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // now query via the testConnection, this should work @@ -1569,7 +1569,7 @@ public class CollectionIT extends AbstractCoreIT { em.create( "loveobject", properties ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); location = new LinkedHashMap<String, Object>(); location.put( "Place", "Via Pietro Maroncelli, 48, 62012 Santa Maria Apparente Province of Macerata, Italy" ); @@ -1587,7 +1587,7 @@ public class CollectionIT extends AbstractCoreIT { em.create( "loveobject", properties ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // String s = "select * where Flag = 'requested'"; // String s = "select * where Flag = 'requested' and NOT Recipient.Username = @@ -1632,7 +1632,7 @@ public class CollectionIT extends AbstractCoreIT { createdEntities.add( created ); } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Results r = em.getCollection( em.getApplicationRef(), "users", null, 50, Level.ALL_PROPERTIES, false ); @@ -1729,7 +1729,7 @@ public class CollectionIT extends AbstractCoreIT { Entity game2 = em.create( "game", properties ); assertNotNull( game2 ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // overlap Query query = new Query(); @@ -1763,7 +1763,7 @@ public class CollectionIT extends AbstractCoreIT { Entity game2 = em.create( "game", properties ); assertNotNull( game2 ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // overlap Query query = new Query(); @@ -1797,7 +1797,7 @@ public class CollectionIT extends AbstractCoreIT { Entity createUser2 = em.create( user2 ); assertNotNull( createUser2 ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // overlap Query query = new Query(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java index 63c7cb8..596ec7c 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java @@ -74,7 +74,7 @@ public class CountingMutatorIT extends AbstractCoreIT { properties.put( "username", "testuser" ); properties.put( "email", "[email protected]" ); Entity created = em.create( "user", properties ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Entity returned = em.get( created.getUuid() ); @@ -89,7 +89,7 @@ public class CountingMutatorIT extends AbstractCoreIT { Entity connectedEntity = em.create( "user", connectedProps ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // Connect from our new entity to our root one so it's updated when paging em.createConnection( connectedEntity, "following", returned ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java index 296bf53..e1e24c4 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java @@ -64,7 +64,7 @@ public class EntityConnectionsIT extends AbstractCoreIT { assertEquals( 1, connectionTypes.size()); assertEquals("likes", connectionTypes.iterator().next()); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Results r = em.getTargetEntities(firstUserEntity, "likes", null, Level.IDS); @@ -128,7 +128,7 @@ public class EntityConnectionsIT extends AbstractCoreIT { logger.info( "\n\nConnecting " + awardA.getUuid() + " \"awarded\" " + catB.getUuid() + "\n" ); em.createConnection( awardA, "awarded", catB ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // List forward connections for cat A @@ -149,7 +149,7 @@ public class EntityConnectionsIT extends AbstractCoreIT { logger.info( "\n\nConnecting " + awardA.getUuid() + " \"awarded\" " + catA.getUuid() + "\n" ); em.createConnection( awardA, "awarded", catA ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // List forward connections for cat A // Not valid with current usages @@ -256,7 +256,7 @@ public class EntityConnectionsIT extends AbstractCoreIT { em.createConnection( secondUserEntity, "likes", arrogantbutcher ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Results r = em.getTargetEntities(firstUserEntity, "likes", "restaurant", Level.IDS); @@ -310,7 +310,7 @@ public class EntityConnectionsIT extends AbstractCoreIT { em.createConnection( fredEntity, "likes", wilmaEntity ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // // search for "likes" edges from fred // assertEquals( 1, @@ -363,7 +363,7 @@ public class EntityConnectionsIT extends AbstractCoreIT { em.createConnection( fredEntity, "likes", JohnEntity ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // now query via the testConnection, this should work @@ -410,7 +410,7 @@ public class EntityConnectionsIT extends AbstractCoreIT { } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Results r = em.getTargetEntities(firstUserEntity, "likes", null, Level.ALL_PROPERTIES) ; @@ -453,7 +453,7 @@ public class EntityConnectionsIT extends AbstractCoreIT { // // em.createConnection( fredEntity, "likes", wilmaEntity ); // -// app.refreshIndex(); +// app.waitForQueueDrainAndRefreshIndex(); // //// // search for "likes" edges from fred //// assertEquals( 1, http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java index cb3a728..e1e4a05 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java @@ -75,7 +75,7 @@ public class EntityManagerIT extends AbstractCoreIT { assertEquals( "user.username not expected value", "edanuff", user.getProperty( "username" ) ); assertEquals( "user.email not expected value", "[email protected]", user.getProperty( "email" ) ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); EntityRef userRef = em.getAlias( new SimpleEntityRef( "application", applicationId ), "users", "edanuff" ); @@ -274,13 +274,13 @@ public class EntityManagerIT extends AbstractCoreIT { Entity thing = em.create( "thing", properties ); logger.info( "Entity created" ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); logger.info( "Starting entity delete" ); em.delete( thing ); logger.info( "Entity deleted" ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // now search by username, no results should be returned @@ -310,13 +310,13 @@ public class EntityManagerIT extends AbstractCoreIT { Entity user = em.create( "user", properties ); logger.info( "Entity created" ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); logger.info( "Starting entity delete" ); em.delete( user ); logger.info( "Entity deleted" ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // now search by username, no results should be returned @@ -335,7 +335,7 @@ public class EntityManagerIT extends AbstractCoreIT { user = em.create( "user", properties ); logger.info( "Entity created" ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); final Query userNameQuery = Query.fromQL( "username = '" + name + "'" ); @@ -456,7 +456,7 @@ public class EntityManagerIT extends AbstractCoreIT { EntityRef appRef = em.get( new SimpleEntityRef( "application", app.getId() ) ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Results r = em.getCollection( appRef, "things", null, 10, Level.ALL_PROPERTIES, false ); @@ -548,7 +548,7 @@ public class EntityManagerIT extends AbstractCoreIT { Entity createdDevice = em.createItemInCollection( createdUser, "devices", "device", device ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Entity returnedDevice = em.get( new SimpleEntityRef( "device", createdDevice.getUuid() ) ); @@ -580,7 +580,7 @@ public class EntityManagerIT extends AbstractCoreIT { Entity user = em.create( "robot", properties ); assertNotNull( user ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); assertNotNull( em.get( user.getUuid() ) ); } @@ -608,7 +608,7 @@ public class EntityManagerIT extends AbstractCoreIT { em.addToCollection(appRef, "fluffies", entityRef); em.addToCollection(appRef, "fluffies", entityRef); - //app.refreshIndex(); + //app.waitForQueueDrainAndRefreshIndex(); Results results = em.getCollection(appRef, "fluffies", null, 10, Level.ALL_PROPERTIES, true); http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java index b7d708e..df77084 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java @@ -96,7 +96,7 @@ public class GeoIT extends AbstractCoreIT { assertNotNull(hotel); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); //2. Query with a globally large distance to verify location @@ -142,7 +142,7 @@ public class GeoIT extends AbstractCoreIT { }}; Entity user = em.create("user", properties); assertNotNull(user); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); //2. Query with a globally large distance to verify location Query query = Query.fromQL("select * where location within " + Integer.MAX_VALUE + " of 0, 0"); @@ -154,7 +154,7 @@ public class GeoIT extends AbstractCoreIT { user.getDynamicProperties().remove("location"); em.updateProperties(user, properties); em.update(user); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); //4. Repeat the query, expecting no results listResults = em.searchCollection(em.getApplicationRef(), "users", query); @@ -188,7 +188,7 @@ public class GeoIT extends AbstractCoreIT { }}; Entity user = em.create("user", properties); assertNotNull(user); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); final double lat = 37.776753; final double lon = -122.407846; @@ -234,7 +234,7 @@ public class GeoIT extends AbstractCoreIT { }}; Entity user = em.create("user", properties); assertNotNull(user); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); final double lat = 37.776753; final double lon = -122.407846; @@ -284,12 +284,12 @@ public class GeoIT extends AbstractCoreIT { Entity user = em.create("user", userProperties); assertNotNull(user); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); //3. Create a connection between the user and the entity em.createConnection(user, "likes", restaurant); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); //4. Test that the user is within 2000m of the entity Results emSearchResults = em.searchTargetEntities(user, Query.fromQL("location within 5000 of " @@ -326,7 +326,7 @@ public class GeoIT extends AbstractCoreIT { assertNotNull(entity); logger.debug("Entity {} created", entity.getProperty("name")); } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); //2. validate the size of the result Query query = new Query(); Results listResults = em.searchCollection(em.getApplicationRef(), "stores", query); @@ -367,7 +367,7 @@ public class GeoIT extends AbstractCoreIT { assertNotNull(entity); logger.debug("Entity {} created", entity.getProperty("name")); } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); //2. validate the size of the result Query query = new Query(); Results listResults = em.searchCollection(em.getApplicationRef(), "stores", query); @@ -540,7 +540,7 @@ public class GeoIT extends AbstractCoreIT { em.create("store", data); } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // earth's circumference is 40,075 kilometers. Up it to 50,000kilometers // just to be save @@ -596,7 +596,7 @@ public class GeoIT extends AbstractCoreIT { em.create("store", data); } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Thread.sleep(2000); // earth's circumference is 40,075 kilometers. Up it to 50,000kilometers @@ -669,7 +669,7 @@ public class GeoIT extends AbstractCoreIT { em.create("store", data); } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // earth's circumference is 40,075 kilometers. Up it to 50,000kilometers // just to be save @@ -729,7 +729,7 @@ public class GeoIT extends AbstractCoreIT { created.add(e); } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); int startDelta = size - min; @@ -794,7 +794,7 @@ public class GeoIT extends AbstractCoreIT { em.create("store", data); } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); //do a direct geo iterator test. We need to make sure that we short circuit on the correct tile. @@ -838,7 +838,7 @@ public class GeoIT extends AbstractCoreIT { assertNotNull(entity); } //3. refresh the index - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); //4. return the entity manager return em; } @@ -857,7 +857,7 @@ public class GeoIT extends AbstractCoreIT { latlong.put("longitude", longitude); em.setProperty(entity, "location", latlong); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java index 9a3f5a6..609f977 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoQueryBooleanTest.java @@ -79,7 +79,7 @@ public class GeoQueryBooleanTest extends AbstractCoreIT { Entity user2 = em.create( "user", properties ); assertNotNull( user2 ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // define center point about 300m from that location final double lat = 37.774277; @@ -158,7 +158,7 @@ public class GeoQueryBooleanTest extends AbstractCoreIT { Entity userFred = em.create( "user", properties ); assertNotNull( userFred ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // define center point about 300m from that location final double lat = 37.774277; http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java index d62f88e..5933b57 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java @@ -60,7 +60,7 @@ public class IndexIT extends AbstractCoreIT { em.create( "item", properties ); } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); int i = 0; @@ -133,7 +133,7 @@ public class IndexIT extends AbstractCoreIT { em.create( "item", properties ); } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Query query = Query.fromQL( "name < 'delta' order by name asc" ); Results r = em.searchCollection( em.getApplicationRef(), "items", query ); @@ -261,7 +261,7 @@ public class IndexIT extends AbstractCoreIT { em.create( "item", properties ); } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Query query = Query.fromQL( "group = 1 order by name desc" ); Results r = em.searchCollection( em.getApplicationRef(), "items", query ); @@ -290,7 +290,7 @@ public class IndexIT extends AbstractCoreIT { em.create("names", entity1); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); //should return valid values Query query = Query.fromQL("select status where status = 'pickled'"); @@ -338,7 +338,7 @@ public class IndexIT extends AbstractCoreIT { em.createConnection( entity2Ref, "connecting", entity1Ref ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); //should return valid values Query query = Query.fromQL( "select * where status = 'pickled'" ); @@ -357,7 +357,7 @@ public class IndexIT extends AbstractCoreIT { em.update( entity1Ref ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); //query and check the status has been updated, shouldn't return results query = Query.fromQL( "select * where status = 'pickled'" ); @@ -413,7 +413,7 @@ public class IndexIT extends AbstractCoreIT { em.createConnection( entity2Ref, "connecting", entity1Ref ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); //should return valid values Query query = Query.fromQL( "select * where status = 'pickled'" ); @@ -432,7 +432,7 @@ public class IndexIT extends AbstractCoreIT { em.update( entity1Ref ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); //query and check the status has been updated, shouldn't return results query = Query.fromQL( "select * where status = 'pickled'" ); @@ -500,7 +500,7 @@ public class IndexIT extends AbstractCoreIT { }}; em.create("names", entity2); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // simple single-field select mapping { http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java index e6ecf97..329a5be 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java @@ -28,7 +28,6 @@ import java.util.UUID; import org.junit.Test; import org.apache.usergrid.AbstractCoreIT; -import org.apache.usergrid.persistence.Query; import org.apache.usergrid.persistence.Query.Level; import org.apache.usergrid.persistence.model.util.UUIDGenerator; @@ -63,7 +62,7 @@ public class PathQueryIT extends AbstractCoreIT { } } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Thread.sleep(1000); @@ -135,7 +134,7 @@ public class PathQueryIT extends AbstractCoreIT { } } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // pick an arbitrary group, ensure it has 7 users Results ru = em.getCollection( groups.get( 2 ), "users", null, 20, Level.IDS, false ); @@ -152,7 +151,7 @@ public class PathQueryIT extends AbstractCoreIT { } } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); // pick an arbitrary user, ensure it has 7 devices Results rd = em.getCollection( users.get( 6 ), "devices", null, 20, Level.IDS, false ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java index 11f0692..1072d29 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PermissionsIT.java @@ -28,8 +28,6 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.commons.lang3.RandomStringUtils; - import org.apache.usergrid.AbstractCoreIT; import org.apache.usergrid.persistence.entities.Role; import org.apache.usergrid.persistence.Query.Level; @@ -147,7 +145,7 @@ public class PermissionsIT extends AbstractCoreIT { dump( "group roles", roles ); em.deleteGroupRole( group.getUuid(), "author" ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Thread.sleep(1000); roles = em.getGroupRoles( group.getUuid() ); @@ -156,7 +154,7 @@ public class PermissionsIT extends AbstractCoreIT { em.addUserToGroupRole( user.getUuid(), group.getUuid(), "admin" ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Results r = em.getUsersInGroupRole( group.getUuid(), "admin", Level.ALL_PROPERTIES ); dump( "entities", r.getEntities() ); assertEquals( "proper number of users in group role not set", 1, r.size() ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java index 383d620..a7759de 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java @@ -125,7 +125,7 @@ public class RebuildIndexTest extends AbstractCoreIT { } logger.info( "Created {} entities", ENTITIES_TO_INDEX ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(5000); // ----------------- test that we can read them, should work fine @@ -163,6 +163,7 @@ public class RebuildIndexTest extends AbstractCoreIT { waitForRebuild( status, reIndexService ); + app.waitForQueueDrainAndRefreshIndex(5000); // ----------------- test that we can read the catherder collection and not the catshepard @@ -233,7 +234,7 @@ public class RebuildIndexTest extends AbstractCoreIT { } logger.info( "Created {} entities", ENTITIES_TO_INDEX ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(15000); // ----------------- test that we can read them, should work fine @@ -247,6 +248,8 @@ public class RebuildIndexTest extends AbstractCoreIT { deleteIndex( em.getApplicationId() ); + app.waitForQueueDrainAndRefreshIndex(); + // ----------------- test that we can read them, should fail // deleting sytem app index will interfere with other concurrently running tests @@ -283,7 +286,6 @@ public class RebuildIndexTest extends AbstractCoreIT { logger.info( "Rebuilt index" ); - app.refreshIndex(); } catch ( Exception ex ) { logger.error( "Error rebuilding index", ex ); @@ -292,7 +294,7 @@ public class RebuildIndexTest extends AbstractCoreIT { // ----------------- test that we can read them - Thread.sleep( 2000 ); + app.waitForQueueDrainAndRefreshIndex(15000); readData( em, collectionName, ENTITIES_TO_INDEX, 3 ); } @@ -343,7 +345,7 @@ public class RebuildIndexTest extends AbstractCoreIT { logger.info( "Created {} entities", ENTITIES_TO_INDEX ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(5000); // ----------------- test that we can read them, should work fine @@ -392,7 +394,6 @@ public class RebuildIndexTest extends AbstractCoreIT { logger.info( "Rebuilt index" ); - app.refreshIndex(); } catch ( Exception ex ) { logger.error( "Error rebuilding index", ex ); @@ -401,7 +402,7 @@ public class RebuildIndexTest extends AbstractCoreIT { // ----------------- test that we can read them - Thread.sleep( 2000 ); + app.waitForQueueDrainAndRefreshIndex(5000); results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, 3 ); assertEquals(results.size(),3); q = Query.fromQL("select * where location within 100 of "+lat+", "+lon); @@ -435,7 +436,7 @@ public class RebuildIndexTest extends AbstractCoreIT { final Entity secondEntity = em.create( "thing", entityData); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(5000); // ----------------- test that we can read them, should work fine @@ -493,7 +494,6 @@ public class RebuildIndexTest extends AbstractCoreIT { logger.info( "Rebuilt index" ); - app.refreshIndex(); } catch ( Exception ex ) { logger.error( "Error rebuilding index", ex ); @@ -502,7 +502,7 @@ public class RebuildIndexTest extends AbstractCoreIT { // ----------------- test that we can read them - Thread.sleep( 2000 ); + app.waitForQueueDrainAndRefreshIndex(5000); countEntities( em, collectionName, 1 ); } @@ -547,14 +547,14 @@ public class RebuildIndexTest extends AbstractCoreIT { ); ei.deleteApplication().toBlocking().lastOrDefault( null ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); } private int readData( EntityManager em, String collectionName, int expectedEntities, int expectedConnections ) throws Exception { - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 ); Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities ); @@ -593,7 +593,7 @@ public class RebuildIndexTest extends AbstractCoreIT { private int countEntities( EntityManager em, String collectionName, int expectedEntities) throws Exception { - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 ); Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java index d287d7e..3652b6f 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java @@ -26,11 +26,8 @@ import java.util.UUID; import org.apache.usergrid.Application; import org.apache.usergrid.CoreApplication; import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder; -import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilderImpl; import org.apache.usergrid.corepersistence.index.ReIndexService; -import org.apache.usergrid.corepersistence.index.ReIndexServiceImpl; import org.apache.usergrid.persistence.*; -import org.apache.usergrid.utils.UUIDUtils; import org.junit.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,8 +40,6 @@ import org.apache.usergrid.persistence.cassandra.util.TraceTagManager; import org.apache.usergrid.persistence.cassandra.util.TraceTagReporter; import org.apache.usergrid.persistence.model.util.UUIDGenerator; import org.apache.usergrid.setup.ConcurrentProcessSingleton; -import rx.functions.Func0; -import rx.functions.Func1; import rx.functions.Func2; import javax.annotation.concurrent.NotThreadSafe; @@ -140,7 +135,7 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT { Thread.sleep( 500 ); } - this.app.refreshIndex(); + this.app.waitForQueueDrainAndRefreshIndex(); // wait for it to appear in delete apps list @@ -164,7 +159,7 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT { // delete the application setup.getEmf().deleteApplication(deletedAppId); - this.app.refreshIndex(); + this.app.waitForQueueDrainAndRefreshIndex(); found = findApps.call( deletedAppId, emf.getDeletedApplications() ); @@ -196,14 +191,14 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT { } }while (status.getStatus()!= ReIndexService.Status.COMPLETE); - this.app.refreshIndex(); + this.app.waitForQueueDrainAndRefreshIndex(); // test to see that app now works and is happy // it should not be found in the deleted apps collection found = findApps.call( deletedAppId, emf.getDeletedApplications()); assertFalse("Restored app found in deleted apps collection", found); - this.app.refreshIndex(); + this.app.waitForQueueDrainAndRefreshIndex(); apps = setup.getEmf().getApplications(); found = findApps.call(deletedAppId, apps); http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java index e5c84f8..1f53c0a 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/ConnectionHelper.java @@ -76,7 +76,7 @@ public class ConnectionHelper extends CollectionIoHelper { @Override public Results getResults( Query query ) throws Exception { - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); query.setConnectionType( CONNECTION ); query.setEntityType( "test" ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java index cea3b35..dcc8eae 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionTransitivePagingIT.java @@ -131,7 +131,7 @@ public class IntersectionTransitivePagingIT{ } - this.app.refreshIndex(); + this.app.waitForQueueDrainAndRefreshIndex(); Thread.sleep(1000); return expected; http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java index 4d60164..3403dc8 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IntersectionUnionPagingIT.java @@ -135,7 +135,7 @@ public class IntersectionUnionPagingIT { } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); long stop = System.currentTimeMillis(); logger.info( "Writes took {} ms", stop - start ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java index b2003fe..dac3f68 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java @@ -298,7 +298,7 @@ public class IteratingQueryIT { //we have to sleep, or we kill embedded cassandra } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Thread.sleep(1000); long stop = System.currentTimeMillis(); @@ -367,7 +367,7 @@ public class IteratingQueryIT { expected.add( name ); } } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); long stop = System.currentTimeMillis(); @@ -438,7 +438,7 @@ public class IteratingQueryIT { } } - this.app.refreshIndex(); + this.app.waitForQueueDrainAndRefreshIndex(); long stop = System.currentTimeMillis(); logger.info( "Writes took {} ms", stop - start ); @@ -551,7 +551,7 @@ public class IteratingQueryIT { expectedResults.add( name ); } } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); long stop = System.currentTimeMillis(); @@ -623,7 +623,7 @@ public class IteratingQueryIT { } } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); long stop = System.currentTimeMillis(); logger.info( "Writes took {} ms", stop - start ); @@ -689,7 +689,7 @@ public class IteratingQueryIT { } } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); long stop = System.currentTimeMillis(); logger.info( "Writes took {} ms", stop - start ); @@ -742,7 +742,7 @@ public class IteratingQueryIT { io.writeEntity( entity ); expected.add( name ); } - this.app.refreshIndex(); + this.app.waitForQueueDrainAndRefreshIndex(); long stop = System.currentTimeMillis(); @@ -803,7 +803,7 @@ public class IteratingQueryIT { io.writeEntity( entity ); expected.add( name ); } - this.app.refreshIndex(); + this.app.waitForQueueDrainAndRefreshIndex(); long stop = System.currentTimeMillis(); @@ -865,7 +865,7 @@ public class IteratingQueryIT { expected.add( name ); } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); long stop = System.currentTimeMillis(); logger.info( "Writes took {} ms", stop - start ); @@ -924,7 +924,7 @@ public class IteratingQueryIT { io.writeEntity( entity ); expected.add( name ); } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Thread.sleep(500); long stop = System.currentTimeMillis(); @@ -987,7 +987,7 @@ public class IteratingQueryIT { expected.add( name ); } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); long stop = System.currentTimeMillis(); logger.info( "Writes took {} ms", stop - start ); @@ -1050,7 +1050,7 @@ public class IteratingQueryIT { io.writeEntity( entity ); expected.add( name ); } - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); long stop = System.currentTimeMillis(); @@ -1110,7 +1110,7 @@ public class IteratingQueryIT { io.writeEntity( entity ); } - this.app.refreshIndex(); + this.app.waitForQueueDrainAndRefreshIndex(); long stop = System.currentTimeMillis(); @@ -1216,7 +1216,7 @@ public class IteratingQueryIT { logger.info( "Writes took {} ms", stop - start ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Query query = Query.fromQL( "select * order by boolean desc, index asc" ); query.setLimit( queryLimit ); @@ -1322,7 +1322,7 @@ public class IteratingQueryIT { logger.info( "Writes took {} ms", stop - start ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); Query query = Query.fromQL( "select * where intersect = true OR intersect2 = true order by created, intersect desc" ); @@ -1384,7 +1384,7 @@ public class IteratingQueryIT { io.writeEntity( entity ); } - this.app.refreshIndex(); + this.app.waitForQueueDrainAndRefreshIndex(); long stop = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java index f7308da..3f5573f 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/NotSubPropertyIT.java @@ -132,7 +132,7 @@ public class NotSubPropertyIT { logger.info( "Writes took {} ms", stop - start ); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); return expected; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java index 60c1622..89641a8 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/ParenthesisProblemIT.java @@ -72,7 +72,7 @@ public class ParenthesisProblemIT extends AbstractCoreIT { put("age",1); }}); - app.refreshIndex(); + app.waitForQueueDrainAndRefreshIndex(); final Results entities = em.searchCollection( em.getApplicationRef(), "cats", Query.fromQL(query)); http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/core/src/test/resources/usergrid-custom-test.properties ---------------------------------------------------------------------- diff --git a/stack/core/src/test/resources/usergrid-custom-test.properties b/stack/core/src/test/resources/usergrid-custom-test.properties index c544967..8f9058d 100644 --- a/stack/core/src/test/resources/usergrid-custom-test.properties +++ b/stack/core/src/test/resources/usergrid-custom-test.properties @@ -49,6 +49,9 @@ collection.uniquevalues.authoritative.region=us-east # Queueing Test Settings # Reduce the long polling time for the tests queue.long.polling.time.millis=50 +elasticsearch.worker_count=8 +elasticsearch.worker_count_utility=8 +queue.get.timeout.seconds=8 # --- End: Usergrid cluster/actor system settings http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java index 061807b..778274e 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java @@ -165,7 +165,7 @@ public interface QakkaFig extends GuicyFig, Serializable { long getMaxShardSize(); @Key(QUEUE_LONG_POLL_TIME_MILLIS) - @Default("5000") + @Default("1000") long getLongPollTimeMillis(); /** Max time-to-live for queue message and payload data */ @@ -174,7 +174,7 @@ public interface QakkaFig extends GuicyFig, Serializable { int getMaxTtlSeconds(); @Key(QUEUE_IN_MEMORY) - @Default("true") + @Default("false") // in memory not ready yet; leave this to false else msgs could be processed more than once boolean getInMemoryCache(); @Key(QUEUE_IN_MEMORY_REFRESH_ASYNC) http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java index 09bb8de..fa5ee0b 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java @@ -59,7 +59,7 @@ public class InMemoryQueue { } } - public void add( String queueName, DatabaseQueueMessage databaseQueueMessage ) { + synchronized public void add( String queueName, DatabaseQueueMessage databaseQueueMessage ) { UUID newest = newestByQueueName.get( queueName ); if ( newest == null ) { @@ -76,7 +76,7 @@ public class InMemoryQueue { getQueue( queueName ).add( databaseQueueMessage ); } - public UUID getNewest( String queueName ) { + synchronized public UUID getNewest( String queueName ) { if ( getQueue( queueName ).isEmpty() ) { newestByQueueName.remove( queueName ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java index ac2857f..fd4257b 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java @@ -44,6 +44,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.UUID; +import java.util.concurrent.TimeUnit; @Singleton @@ -188,7 +189,7 @@ public class QueueMessageManagerImpl implements QueueMessageManager { queueMessage.setData( json ); } catch (UnsupportedEncodingException e) { - logger.error("Error unencoding data for messageId=" + queueMessage.getMessageId(), e); + logger.error("Error decoding data for messageId=" + queueMessage.getMessageId(), e); } } else { try { @@ -201,6 +202,12 @@ public class QueueMessageManagerImpl implements QueueMessageManager { } queueMessages.add( queueMessage ); + } else if ( (System.currentTimeMillis() - dbMessage.getQueuedAt()) > TimeUnit.HOURS.toMillis(2) ) { + logger.warn("Queue Message does not have corresponding data after 2 hours, removing from queue - " + + "queueName: {}, region: {}, queueMessageId: {}", dbMessage.getQueueName(), dbMessage.getRegion(), + dbMessage.getQueueMessageId()); + queueMessageSerialization.deleteMessage(dbMessage.getQueueName(), dbMessage.getRegion(), + dbMessage.getShardId(), dbMessage.getType(), dbMessage.getQueueMessageId()); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java index 89c79ec..eb26b69 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java @@ -23,6 +23,7 @@ import com.codahale.metrics.Timer; import com.google.inject.Inject; import com.google.inject.Singleton; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.SystemUtils; import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; import org.apache.usergrid.persistence.model.util.UUIDGenerator; import org.apache.usergrid.persistence.qakka.MetricsService; @@ -177,7 +178,7 @@ public class QueueActorHelper { } } - newestFetchedUuid.put( queueName, since ); + updateUUIDPointer(queueName, since); // Shard currentShard = multiShardIterator.getCurrentShard(); // if ( currentShard != null ) { @@ -279,7 +280,7 @@ public class QueueActorHelper { } - void queueRefresh( String queueName ) { + synchronized void queueRefresh( String queueName ) { Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time(); @@ -327,7 +328,7 @@ public class QueueActorHelper { startingShards.put( shardKey, shardId ); - lastRefreshTimeMillis.put( queueName, System.currentTimeMillis() ); + updateLastRefreshedTime(queueName); if ( count > 0 ) { Object shard = shardIdOptional.isPresent() ? shardIdOptional.get() : "null"; @@ -346,4 +347,12 @@ public class QueueActorHelper { return queueName + "_" + type + region; } + private synchronized void updateUUIDPointer(String queueName, UUID newUUIDPointer){ + newestFetchedUuid.put( queueName, newUUIDPointer ); + } + + private synchronized void updateLastRefreshedTime(String queueName){ + lastRefreshTimeMillis.put( queueName, System.currentTimeMillis() ); + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java index 1ff8502..cbc7245 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java @@ -133,7 +133,7 @@ public class QueueActorRouter extends UntypedActor { getContext().dispatcher(), getSelf() ); shardAllocationSchedulersByQueueName.put( queueName, scheduler ); - logger.debug( "Created shard allocater for queue {}", queueName ); + logger.debug( "Created shard allocator for queue {}", queueName ); } } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/d3e988bc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java index 19059e6..75c1c22 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java @@ -139,8 +139,8 @@ public class ShardAllocator extends UntypedActor { shardSerialization.createShard( newShard ); shardCounterSerialization.incrementCounter( queueName, type, newShard.getShardId(), 0 ); - logger.info("{} Created new shard for queue {} shardId {} timestamp {} counterValue {}", - this.hashCode(), queueName, shard.getShardId(), futureUUID.timestamp(), counterValue ); + logger.info("Allocated new shard for queue, newShardID: {}, queueName: {}, shardMessageCount: {}, usedPercent: {}%", + newShard.getShardId(), queueName, counterValue, (long)((double)counterValue/(double)qakkaFig.getMaxShardSize()*100) ); } else { // logger.debug("No new shard for queue {} counterValue {} of max {}",
