add consistent searching
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b93815f8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b93815f8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b93815f8 Branch: refs/heads/USERGRID-614 Commit: b93815f8f735794399cdf14bde850133bbb990c5 Parents: 36b5bad Author: Shawn Feldman <[email protected]> Authored: Tue May 5 13:35:28 2015 -0600 Committer: Shawn Feldman <[email protected]> Committed: Tue May 5 13:35:28 2015 -0600 ---------------------------------------------------------------------- .../batch/service/SchedulerServiceImpl.java | 2 +- .../corepersistence/CpEntityManager.java | 17 +++++----- .../corepersistence/CpEntityManagerFactory.java | 4 +-- .../corepersistence/CpRelationManager.java | 23 +++++++++++++ .../usergrid/persistence/EntityManager.java | 3 ++ .../usergrid/persistence/RelationManager.java | 10 ++++++ .../org/apache/usergrid/CoreApplication.java | 2 +- .../usergrid/persistence/CollectionIT.java | 4 +-- .../index/impl/IndexRefreshCommandImpl.java | 31 ++++++++---------- .../persistence/index/impl/EntityIndexTest.java | 34 ++++++++++---------- 10 files changed, 80 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b93815f8/stack/core/src/main/java/org/apache/usergrid/batch/service/SchedulerServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/batch/service/SchedulerServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/batch/service/SchedulerServiceImpl.java index 646fa23..eedd52e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/batch/service/SchedulerServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/batch/service/SchedulerServiceImpl.java @@ -435,6 +435,6 @@ public class SchedulerServiceImpl implements SchedulerService, JobAccessor, JobR @Override public void refreshIndex() { this.entityIndex = entityIndex == null ? injector.getInstance(EntityIndex.class) : entityIndex; - entityIndex.refreshAsync().toBlocking().last(); + entityIndex.refreshAsync().toBlocking().first(); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b93815f8/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 460fc11..d8721ea 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -420,10 +420,10 @@ public class CpEntityManager implements EntityManager { if(entity == null) { return null; } - Class clazz = Schema.getDefaultSchema().getEntityClass( entity.getId().getType() ); + Class clazz = Schema.getDefaultSchema().getEntityClass(entity.getId().getType()); - Entity oldFormatEntity = EntityFactory.newEntity(entity.getId().getUuid(),entity.getId().getType(),clazz); - oldFormatEntity.setProperties( CpEntityMapUtils.toMap( entity ) ); + Entity oldFormatEntity = EntityFactory.newEntity(entity.getId().getUuid(), entity.getId().getType(), clazz); + oldFormatEntity.setProperties(CpEntityMapUtils.toMap(entity)); return oldFormatEntity; } @@ -654,7 +654,7 @@ public class CpEntityManager implements EntityManager { // } org.apache.usergrid.persistence.model.entity.Entity entity = - load( entityId ); + load(entityId); if ( entity != null ) { @@ -704,12 +704,11 @@ public class CpEntityManager implements EntityManager { return getRelationManager( entityRef ).searchCollection( collectionName, query ); } - // - // @Override - // public void setApplicationId( UUID applicationId ) { - // this.applicationId = applicationId; - // } + @Override + public Results searchCollectionConsistent( EntityRef entityRef, String collectionName, Query query, int expectedResults) throws Exception { + return getRelationManager( entityRef ).searchCollectionConsistent(collectionName, query, expectedResults ); + } @Override http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b93815f8/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index 6c375ef..6758977 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -378,7 +378,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application if (oldAppEntity != null) { managementEm.delete(oldAppEntity); applicationIdCache.evictAppId(oldAppEntity.getName()); - entityIndex.refreshAsync().toBlocking().last(); + entityIndex.refreshAsync().toBlocking().first(); } } catch (Exception e) { throw new RuntimeException(e); @@ -649,7 +649,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application // refresh special indexes without calling EntityManager refresh because stack overflow maybeCreateIndexes(); - return entityIndex.refreshAsync().toBlocking().last(); + return entityIndex.refreshAsync().toBlocking().first(); } private void maybeCreateIndexes() { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b93815f8/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 6adeefc..7d75eed 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -652,6 +652,29 @@ public class CpRelationManager implements RelationManager { return new ObservableQueryExecutor( resultsObservable ).next(); } + @Override + public Results searchCollectionConsistent( String collName, Query query, int expectedResults ) throws Exception { + Results results; + long maxLength = 5000; + long sleepTime = 250; + boolean found; + long current = System.currentTimeMillis(), length = 0; + do { + results = searchCollection(collName, query); + length = System.currentTimeMillis() - current; + found = expectedResults == results.size(); + if(found){ + break; + } + Thread.sleep(sleepTime); + }while (!found && length <= maxLength); + if(logger.isInfoEnabled()){ + logger.info(String.format("Consistent Search finished in %s, results=%s, expected=%s...dumping stack",length, results.size(),expectedResults)); + Thread.dumpStack(); + } + return results; + } + @Override public ConnectionRef createConnection( ConnectionRef connection ) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b93815f8/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java index 23b6d6b..b8d3360 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java @@ -403,6 +403,9 @@ public interface EntityManager { public Results searchCollection( EntityRef entityRef, String collectionName, Query query ) throws Exception; + public Results searchCollectionConsistent( EntityRef entityRef, String collectionName, Query query, int expectedHits) + throws Exception; + public Set<String> getCollectionIndexes( EntityRef entity, String collectionName ) throws Exception; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b93815f8/stack/core/src/main/java/org/apache/usergrid/persistence/RelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/RelationManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/RelationManager.java index 218ea3a..84e4a38 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/RelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/RelationManager.java @@ -66,6 +66,16 @@ public interface RelationManager { public Results searchCollection( String collectionName, Query query ) throws Exception; + /** + * this loops for consistentcy and is dangerous to run often + * @param collectionName + * @param query + * @param expectedResults + * @return + * @throws Exception + */ + public Results searchCollectionConsistent( String collectionName, Query query, int expectedResults ) throws Exception; + public ConnectionRef createConnection( ConnectionRef connection ) throws Exception; public ConnectionRef createConnection( String connectionType, EntityRef connectedEntityRef ) throws Exception; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b93815f8/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java index 01029cf..00cadd0 100644 --- a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java +++ b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java @@ -230,7 +230,7 @@ public class CoreApplication implements Application, TestRule { @Override public synchronized void refreshIndex() { //Insert test entity and find it - entityIndex.refreshAsync().toBlocking().last(); + entityIndex.refreshAsync().toBlocking().first(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b93815f8/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java index 9abfac9..0315055 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java @@ -363,11 +363,9 @@ public class CollectionIT extends AbstractCoreIT { app.refreshIndex(); - Thread.sleep(1000); //TODO find why we have to wait. This is a bug - final Query query = Query.fromQL( "nickname = 'ed'" ); - Results r = em.searchCollection( group, "users", query.withResultsLevel( Level.LINKED_PROPERTIES ) ); + Results r = em.searchCollectionConsistent( group, "users", query.withResultsLevel( Level.LINKED_PROPERTIES ),1 ); LOG.info( JsonUtils.mapToFormattedJsonString( r.getEntities() ) ); assertEquals( 1, r.size() ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b93815f8/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java index 34f0e6e..5c79422 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java @@ -84,8 +84,7 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { @Override - public Observable<IndexRefreshCommandInfo> execute( String[] indexes ) { - + public synchronized Observable<IndexRefreshCommandInfo> execute( String[] indexes ) { final long start = System.currentTimeMillis(); @@ -131,13 +130,12 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { .termFilter( IndexingUtils.ENTITY_ID_FIELDNAME, entityId ) ); - return new IndexRefreshCommandInfo( builder.execute().get().getHits().totalHits() > 0, System.currentTimeMillis() - start ); } catch ( Exception ee ) { logger.error( "Failed during refresh search for " + uuid, ee ); - throw new RuntimeException( "Failed during refresh search for " + uuid, ee ); + throw new RuntimeException("Failed during refresh search for " + uuid, ee ); } } ).skipWhile( info -> !info.hasFinished() ); @@ -167,26 +165,25 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { } } logger.debug( "Refreshed indexes: {},success:{} failed:{} ", StringUtils.join( indexes, ", " ), - successfulShards, failedShards ); - } ) + successfulShards, failedShards); + }) //once the refresh is done execute the search - .flatMap( refreshCommandResult -> searchObservable ) + .flatMap(refreshCommandResult -> searchObservable) //check when found - .doOnNext( found -> { - if ( !found.hasFinished() ) { - logger.error( "Couldn't find record during refresh uuid: {} took ms:{} ", uuid, - found.getExecutionTime() ); - } - else { - logger.info( "found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime() ); + .doOnNext(found -> { + if (!found.hasFinished()) { + logger.error("Couldn't find record during refresh uuid: {} took ms:{} ", uuid, + found.getExecutionTime()); + } else { + logger.info("found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime()); } - } ).doOnCompleted( () -> { + }).doOnCompleted(() -> { //clean up our data - String[] aliases = indexCache.getIndexes( alias, AliasedEntityIndex.AliasType.Read ); + String[] aliases = indexCache.getIndexes(alias, AliasedEntityIndex.AliasType.Read); DeIndexOperation deIndexRequest = - new DeIndexOperation( aliases, appScope, edge, entity.getId(), entity.getVersion() ); + new DeIndexOperation(aliases, appScope, edge, entity.getId(), entity.getVersion()); //delete the item IndexOperationMessage indexOperationMessage = new IndexOperationMessage(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b93815f8/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java index d89509e..c9240fe 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java @@ -153,7 +153,7 @@ public class EntityIndexTest extends BaseIT { batch.index( indexEdge, entity2 ); batch.execute().toBlocking().last(); - ei.refreshAsync().toBlocking().last(); + ei.refreshAsync().toBlocking().first(); StopWatch timer = new StopWatch(); @@ -299,7 +299,7 @@ public class EntityIndexTest extends BaseIT { ei.addIndex("v2", 1, 0, "one"); - ei.refreshAsync().toBlocking().last(); + ei.refreshAsync().toBlocking().first(); insertJsonBlob( entityIndex, entityType, searchEdge, "/sample-large.json", 1, 1 ); @@ -308,7 +308,7 @@ public class EntityIndexTest extends BaseIT { EntityIndexBatch entityIndexBatch = entityIndex.createBatch(); entityIndexBatch.deindex( searchEdge, crs.get( 0 ) ); entityIndexBatch.execute().toBlocking().last(); - ei.refreshAsync().toBlocking().last(); + ei.refreshAsync().toBlocking().first(); //Hilda Youn testQuery( searchEdge, searchTypes, entityIndex, "name = 'Bowers Oneil'", 0 ); @@ -323,7 +323,7 @@ public class EntityIndexTest extends BaseIT { EntityIndexBatch batch = entityIndex.createBatch(); insertJsonBlob( sampleJson, batch, entityType, indexEdge, max, startIndex ); batch.execute().toBlocking().last(); - IndexRefreshCommandImpl.IndexRefreshCommandInfo info = ei.refreshAsync().toBlocking().last(); + IndexRefreshCommandImpl.IndexRefreshCommandInfo info = ei.refreshAsync().toBlocking().first(); long time = info.getExecutionTime(); log.info("refresh took ms:"+time); } @@ -388,7 +388,7 @@ public class EntityIndexTest extends BaseIT { entity.setField( new UUIDField( IndexingUtils.ENTITY_ID_FIELDNAME, UUID.randomUUID())); entityIndex.createBatch().index( searchEdge, entity ).execute().toBlocking().last(); - ei.refreshAsync().toBlocking().last(); + ei.refreshAsync().toBlocking().first(); CandidateResults candidateResults = entityIndex .search(searchEdge, SearchTypes.fromTypes( entity.getId().getType() ), "name contains 'Ferrari*'", 10, 0 ); @@ -397,7 +397,7 @@ public class EntityIndexTest extends BaseIT { EntityIndexBatch batch = entityIndex.createBatch(); batch.deindex( searchEdge, entity ).execute().toBlocking().last(); batch.execute().toBlocking().last(); - ei.refreshAsync().toBlocking().last(); + ei.refreshAsync().toBlocking().first(); candidateResults = entityIndex .search(searchEdge, SearchTypes.fromTypes( entity.getId().getType() ), "name contains 'Ferrari*'", 10, 0 ); @@ -526,7 +526,7 @@ public class EntityIndexTest extends BaseIT { batch.index( indexSCope, user ); batch.execute().toBlocking().last(); - ei.refreshAsync().toBlocking().last(); + ei.refreshAsync().toBlocking().first(); final String query = "where username = 'edanuff'"; @@ -535,7 +535,7 @@ public class EntityIndexTest extends BaseIT { batch.deindex( indexSCope, user.getId(), user.getVersion() ); batch.execute().toBlocking().last(); - ei.refreshAsync().toBlocking().last(); + ei.refreshAsync().toBlocking().first(); // EntityRef @@ -596,7 +596,7 @@ public class EntityIndexTest extends BaseIT { batch.index( indexScope, fred); batch.execute().toBlocking().last(); - ei.refreshAsync().toBlocking().last(); + ei.refreshAsync().toBlocking().first(); final SearchTypes searchTypes = SearchTypes.fromTypes( "user" ); @@ -620,7 +620,7 @@ public class EntityIndexTest extends BaseIT { Id appId = new SimpleId( "entityindextest" ); assertNotEquals( "cluster should be ok", Health.RED, ei.getClusterHealth() ); assertEquals( "index should be ready", Health.GREEN, ei.getIndexHealth() ); - ei.refreshAsync().toBlocking().last(); + ei.refreshAsync().toBlocking().first(); assertNotEquals( "cluster should be fine", Health.RED, ei.getIndexHealth() ); assertNotEquals( "cluster should be ready now", Health.RED, ei.getClusterHealth() ); } @@ -678,7 +678,7 @@ public class EntityIndexTest extends BaseIT { batch.execute().toBlocking().last(); - ei.refreshAsync().toBlocking().last(); + ei.refreshAsync().toBlocking().first(); final int limit = 5; @@ -744,7 +744,7 @@ public class EntityIndexTest extends BaseIT { batch.index( indexSCope, user ); batch.execute().toBlocking().last(); - ei.refreshAsync().toBlocking().last(); + ei.refreshAsync().toBlocking().first(); final String query = "where searchUUID = " + searchUUID; @@ -783,7 +783,7 @@ public class EntityIndexTest extends BaseIT { batch.index( indexSCope, user ); batch.execute().toBlocking().last(); - ei.refreshAsync().toBlocking().last(); + ei.refreshAsync().toBlocking().first(); final String query = "where string = 'I am*'"; @@ -840,7 +840,7 @@ public class EntityIndexTest extends BaseIT { batch.index( indexSCope, first ); batch.index( indexSCope, second ); batch.execute().toBlocking().last(); - ei.refreshAsync().toBlocking().last(); + ei.refreshAsync().toBlocking().first(); final String ascQuery = "order by string"; @@ -905,7 +905,7 @@ public class EntityIndexTest extends BaseIT { batch.execute().toBlocking().last(); - ei.refreshAsync().toBlocking().last(); + ei.refreshAsync().toBlocking().first(); final String singleMatchQuery = "string contains 'alpha' OR string contains 'foo'"; @@ -986,7 +986,7 @@ public class EntityIndexTest extends BaseIT { batch.execute().toBlocking().last(); - ei.refreshAsync().toBlocking().last(); + ei.refreshAsync().toBlocking().first(); final String notFirst = "NOT int = 1"; @@ -1096,7 +1096,7 @@ public class EntityIndexTest extends BaseIT { batch.execute().toBlocking().last(); - ei.refreshAsync().toBlocking().last(); + ei.refreshAsync().toBlocking().first(); final String notFirst = "NOT string = 'I ate a sammich'";
