add consistent searching

Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b93815f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b93815f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b93815f8

Branch: refs/heads/USERGRID-614
Commit: b93815f8f735794399cdf14bde850133bbb990c5
Parents: 36b5bad
Author: Shawn Feldman <[email protected]>
Authored: Tue May 5 13:35:28 2015 -0600
Committer: Shawn Feldman <[email protected]>
Committed: Tue May 5 13:35:28 2015 -0600

----------------------------------------------------------------------
 .../batch/service/SchedulerServiceImpl.java     |  2 +-
 .../corepersistence/CpEntityManager.java        | 17 +++++-----
 .../corepersistence/CpEntityManagerFactory.java |  4 +--
 .../corepersistence/CpRelationManager.java      | 23 +++++++++++++
 .../usergrid/persistence/EntityManager.java     |  3 ++
 .../usergrid/persistence/RelationManager.java   | 10 ++++++
 .../org/apache/usergrid/CoreApplication.java    |  2 +-
 .../usergrid/persistence/CollectionIT.java      |  4 +--
 .../index/impl/IndexRefreshCommandImpl.java     | 31 ++++++++----------
 .../persistence/index/impl/EntityIndexTest.java | 34 ++++++++++----------
 10 files changed, 80 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b93815f8/stack/core/src/main/java/org/apache/usergrid/batch/service/SchedulerServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/batch/service/SchedulerServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/batch/service/SchedulerServiceImpl.java
index 646fa23..eedd52e 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/batch/service/SchedulerServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/batch/service/SchedulerServiceImpl.java
@@ -435,6 +435,6 @@ public class SchedulerServiceImpl implements 
SchedulerService, JobAccessor, JobR
     @Override
     public void refreshIndex() {
         this.entityIndex = entityIndex == null ? 
injector.getInstance(EntityIndex.class) : entityIndex;
-        entityIndex.refreshAsync().toBlocking().last();
+        entityIndex.refreshAsync().toBlocking().first();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b93815f8/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 460fc11..d8721ea 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -420,10 +420,10 @@ public class CpEntityManager implements EntityManager {
         if(entity == null) {
             return null;
         }
-        Class clazz = Schema.getDefaultSchema().getEntityClass( 
entity.getId().getType() );
+        Class clazz = 
Schema.getDefaultSchema().getEntityClass(entity.getId().getType());
 
-        Entity oldFormatEntity = 
EntityFactory.newEntity(entity.getId().getUuid(),entity.getId().getType(),clazz);
-        oldFormatEntity.setProperties( CpEntityMapUtils.toMap( entity ) );
+        Entity oldFormatEntity = 
EntityFactory.newEntity(entity.getId().getUuid(), entity.getId().getType(), 
clazz);
+        oldFormatEntity.setProperties(CpEntityMapUtils.toMap(entity));
 
         return oldFormatEntity;
     }
@@ -654,7 +654,7 @@ public class CpEntityManager implements EntityManager {
         //        }
 
         org.apache.usergrid.persistence.model.entity.Entity entity =
-                load( entityId  );
+                load(entityId);
 
         if ( entity != null ) {
 
@@ -704,12 +704,11 @@ public class CpEntityManager implements EntityManager {
         return getRelationManager( entityRef ).searchCollection( 
collectionName, query );
     }
 
-    //
-    //    @Override
-    //    public void setApplicationId( UUID applicationId ) {
-    //        this.applicationId = applicationId;
-    //    }
+    @Override
+    public Results searchCollectionConsistent( EntityRef entityRef, String 
collectionName, Query query, int expectedResults) throws Exception {
 
+        return getRelationManager( entityRef 
).searchCollectionConsistent(collectionName, query, expectedResults );
+    }
 
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b93815f8/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 6c375ef..6758977 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -378,7 +378,7 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
                     if (oldAppEntity != null) {
                         managementEm.delete(oldAppEntity);
                         applicationIdCache.evictAppId(oldAppEntity.getName());
-                        entityIndex.refreshAsync().toBlocking().last();
+                        entityIndex.refreshAsync().toBlocking().first();
                     }
                 } catch (Exception e) {
                     throw new RuntimeException(e);
@@ -649,7 +649,7 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
         // refresh special indexes without calling EntityManager refresh 
because stack overflow
         maybeCreateIndexes();
 
-        return entityIndex.refreshAsync().toBlocking().last();
+        return entityIndex.refreshAsync().toBlocking().first();
     }
 
     private void maybeCreateIndexes() {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b93815f8/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 6adeefc..7d75eed 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -652,6 +652,29 @@ public class CpRelationManager implements RelationManager {
         return new ObservableQueryExecutor( resultsObservable ).next();
     }
 
+    @Override
+    public Results searchCollectionConsistent( String collName, Query query, 
int expectedResults ) throws Exception {
+        Results results;
+        long maxLength = 5000;
+        long sleepTime = 250;
+        boolean found;
+        long current = System.currentTimeMillis(), length = 0;
+        do {
+            results = searchCollection(collName, query);
+            length = System.currentTimeMillis() - current;
+            found = expectedResults == results.size();
+            if(found){
+                break;
+            }
+            Thread.sleep(sleepTime);
+        }while (!found && length <= maxLength);
+        if(logger.isInfoEnabled()){
+            logger.info(String.format("Consistent Search finished in %s,  
results=%s, expected=%s...dumping stack",length, 
results.size(),expectedResults));
+            Thread.dumpStack();
+        }
+        return results;
+    }
+
 
     @Override
     public ConnectionRef createConnection( ConnectionRef connection ) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b93815f8/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java 
b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
index 23b6d6b..b8d3360 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
@@ -403,6 +403,9 @@ public interface EntityManager {
     public Results searchCollection( EntityRef entityRef, String 
collectionName, Query query )
             throws Exception;
 
+    public Results searchCollectionConsistent( EntityRef entityRef, String 
collectionName, Query query, int expectedHits)
+        throws Exception;
+
     public Set<String> getCollectionIndexes( EntityRef entity, String 
collectionName )
             throws Exception;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b93815f8/stack/core/src/main/java/org/apache/usergrid/persistence/RelationManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/persistence/RelationManager.java 
b/stack/core/src/main/java/org/apache/usergrid/persistence/RelationManager.java
index 218ea3a..84e4a38 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/persistence/RelationManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/persistence/RelationManager.java
@@ -66,6 +66,16 @@ public interface RelationManager {
 
     public Results searchCollection( String collectionName, Query query ) 
throws Exception;
 
+    /**
+     * this loops for consistentcy and is dangerous to run often
+     * @param collectionName
+     * @param query
+     * @param expectedResults
+     * @return
+     * @throws Exception
+     */
+    public Results searchCollectionConsistent( String collectionName, Query 
query, int expectedResults ) throws Exception;
+
     public ConnectionRef createConnection( ConnectionRef connection ) throws 
Exception;
 
     public ConnectionRef createConnection( String connectionType, EntityRef 
connectedEntityRef ) throws Exception;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b93815f8/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java 
b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
index 01029cf..00cadd0 100644
--- a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
+++ b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
@@ -230,7 +230,7 @@ public class CoreApplication implements Application, 
TestRule {
     @Override
     public synchronized void refreshIndex() {
         //Insert test entity and find it
-        entityIndex.refreshAsync().toBlocking().last();
+        entityIndex.refreshAsync().toBlocking().first();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b93815f8/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java 
b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
index 9abfac9..0315055 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
@@ -363,11 +363,9 @@ public class CollectionIT extends AbstractCoreIT {
 
         app.refreshIndex();
 
-        Thread.sleep(1000); //TODO find why we have to wait.  This is a bug
-
         final Query query = Query.fromQL( "nickname = 'ed'" );
 
-        Results r = em.searchCollection( group, "users", 
query.withResultsLevel( Level.LINKED_PROPERTIES ) );
+        Results r = em.searchCollectionConsistent( group, "users", 
query.withResultsLevel( Level.LINKED_PROPERTIES ),1 );
 
         LOG.info( JsonUtils.mapToFormattedJsonString( r.getEntities() ) );
         assertEquals( 1, r.size() );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b93815f8/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 34f0e6e..5c79422 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -84,8 +84,7 @@ public class IndexRefreshCommandImpl implements 
IndexRefreshCommand {
 
 
     @Override
-    public Observable<IndexRefreshCommandInfo> execute( String[] indexes ) {
-
+    public synchronized Observable<IndexRefreshCommandInfo> execute( String[] 
indexes ) {
 
         final long start = System.currentTimeMillis();
 
@@ -131,13 +130,12 @@ public class IndexRefreshCommandImpl implements 
IndexRefreshCommand {
                                                                        
.termFilter( IndexingUtils.ENTITY_ID_FIELDNAME,
                                                                            
entityId ) );
 
-
                     return new IndexRefreshCommandInfo( 
builder.execute().get().getHits().totalHits() > 0,
                         System.currentTimeMillis() - start );
                 }
                 catch ( Exception ee ) {
                     logger.error( "Failed during refresh search for " + uuid, 
ee );
-                    throw new RuntimeException( "Failed during refresh search 
for " + uuid, ee );
+                    throw new RuntimeException("Failed during refresh search 
for " + uuid, ee );
                 }
             } ).skipWhile( info -> !info.hasFinished() );
 
@@ -167,26 +165,25 @@ public class IndexRefreshCommandImpl implements 
IndexRefreshCommand {
                     }
                 }
                 logger.debug( "Refreshed indexes: {},success:{} failed:{} ", 
StringUtils.join( indexes, ", " ),
-                    successfulShards, failedShards );
-            } )
+                    successfulShards, failedShards);
+            })
 
                 //once the refresh is done execute the search
-            .flatMap( refreshCommandResult -> searchObservable )
+            .flatMap(refreshCommandResult -> searchObservable)
 
                 //check when found
-            .doOnNext( found -> {
-                if ( !found.hasFinished() ) {
-                    logger.error( "Couldn't find record during refresh uuid: 
{} took ms:{} ", uuid,
-                        found.getExecutionTime() );
-                }
-                else {
-                    logger.info( "found record during refresh uuid: {} took 
ms:{} ", uuid, found.getExecutionTime() );
+            .doOnNext(found -> {
+                if (!found.hasFinished()) {
+                    logger.error("Couldn't find record during refresh uuid: {} 
took ms:{} ", uuid,
+                        found.getExecutionTime());
+                } else {
+                    logger.info("found record during refresh uuid: {} took 
ms:{} ", uuid, found.getExecutionTime());
                 }
-            } ).doOnCompleted( () -> {
+            }).doOnCompleted(() -> {
                 //clean up our data
-                String[] aliases = indexCache.getIndexes( alias, 
AliasedEntityIndex.AliasType.Read );
+                String[] aliases = indexCache.getIndexes(alias, 
AliasedEntityIndex.AliasType.Read);
                 DeIndexOperation deIndexRequest =
-                    new DeIndexOperation( aliases, appScope, edge, 
entity.getId(), entity.getVersion() );
+                    new DeIndexOperation(aliases, appScope, edge, 
entity.getId(), entity.getVersion());
 
                 //delete the item
                 IndexOperationMessage indexOperationMessage = new 
IndexOperationMessage();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b93815f8/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
 
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index d89509e..c9240fe 100644
--- 
a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ 
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -153,7 +153,7 @@ public class EntityIndexTest extends BaseIT {
         batch.index( indexEdge, entity2 );
         batch.execute().toBlocking().last();
 
-        ei.refreshAsync().toBlocking().last();
+        ei.refreshAsync().toBlocking().first();
 
 
         StopWatch timer = new StopWatch();
@@ -299,7 +299,7 @@ public class EntityIndexTest extends BaseIT {
 
 
         ei.addIndex("v2", 1, 0, "one");
-        ei.refreshAsync().toBlocking().last();
+        ei.refreshAsync().toBlocking().first();
 
         insertJsonBlob( entityIndex, entityType, searchEdge, 
"/sample-large.json", 1, 1 );
 
@@ -308,7 +308,7 @@ public class EntityIndexTest extends BaseIT {
         EntityIndexBatch entityIndexBatch = entityIndex.createBatch();
         entityIndexBatch.deindex( searchEdge, crs.get( 0 ) );
         entityIndexBatch.execute().toBlocking().last();
-        ei.refreshAsync().toBlocking().last();
+        ei.refreshAsync().toBlocking().first();
 
         //Hilda Youn
         testQuery( searchEdge, searchTypes, entityIndex, "name = 'Bowers 
Oneil'", 0 );
@@ -323,7 +323,7 @@ public class EntityIndexTest extends BaseIT {
         EntityIndexBatch batch = entityIndex.createBatch();
         insertJsonBlob( sampleJson, batch, entityType, indexEdge, max, 
startIndex );
         batch.execute().toBlocking().last();
-        IndexRefreshCommandImpl.IndexRefreshCommandInfo info =  
ei.refreshAsync().toBlocking().last();
+        IndexRefreshCommandImpl.IndexRefreshCommandInfo info =  
ei.refreshAsync().toBlocking().first();
         long time = info.getExecutionTime();
         log.info("refresh took ms:"+time);
     }
@@ -388,7 +388,7 @@ public class EntityIndexTest extends BaseIT {
         entity.setField( new UUIDField( IndexingUtils.ENTITY_ID_FIELDNAME, 
UUID.randomUUID()));
 
         entityIndex.createBatch().index( searchEdge, entity 
).execute().toBlocking().last();
-        ei.refreshAsync().toBlocking().last();
+        ei.refreshAsync().toBlocking().first();
 
         CandidateResults candidateResults = entityIndex
             .search(searchEdge, SearchTypes.fromTypes( 
entity.getId().getType() ), "name contains 'Ferrari*'", 10, 0 );
@@ -397,7 +397,7 @@ public class EntityIndexTest extends BaseIT {
         EntityIndexBatch batch = entityIndex.createBatch();
         batch.deindex( searchEdge, entity ).execute().toBlocking().last();
         batch.execute().toBlocking().last();
-        ei.refreshAsync().toBlocking().last();
+        ei.refreshAsync().toBlocking().first();
 
         candidateResults = entityIndex
             .search(searchEdge, SearchTypes.fromTypes( 
entity.getId().getType() ), "name contains 'Ferrari*'", 10, 0 );
@@ -526,7 +526,7 @@ public class EntityIndexTest extends BaseIT {
 
         batch.index( indexSCope, user );
         batch.execute().toBlocking().last();
-        ei.refreshAsync().toBlocking().last();
+        ei.refreshAsync().toBlocking().first();
 
         final String query = "where username = 'edanuff'";
 
@@ -535,7 +535,7 @@ public class EntityIndexTest extends BaseIT {
 
         batch.deindex( indexSCope, user.getId(), user.getVersion() );
         batch.execute().toBlocking().last();
-        ei.refreshAsync().toBlocking().last();
+        ei.refreshAsync().toBlocking().first();
 
         // EntityRef
 
@@ -596,7 +596,7 @@ public class EntityIndexTest extends BaseIT {
         batch.index( indexScope, fred);
 
         batch.execute().toBlocking().last();
-        ei.refreshAsync().toBlocking().last();
+        ei.refreshAsync().toBlocking().first();
 
         final SearchTypes searchTypes = SearchTypes.fromTypes( "user" );
 
@@ -620,7 +620,7 @@ public class EntityIndexTest extends BaseIT {
         Id appId = new SimpleId( "entityindextest" );
         assertNotEquals( "cluster should be ok", Health.RED, 
ei.getClusterHealth() );
         assertEquals( "index should be ready", Health.GREEN, 
ei.getIndexHealth() );
-        ei.refreshAsync().toBlocking().last();
+        ei.refreshAsync().toBlocking().first();
         assertNotEquals( "cluster should be fine", Health.RED, 
ei.getIndexHealth() );
         assertNotEquals( "cluster should be ready now", Health.RED, 
ei.getClusterHealth() );
     }
@@ -678,7 +678,7 @@ public class EntityIndexTest extends BaseIT {
 
         batch.execute().toBlocking().last();
 
-        ei.refreshAsync().toBlocking().last();
+        ei.refreshAsync().toBlocking().first();
 
 
         final int limit = 5;
@@ -744,7 +744,7 @@ public class EntityIndexTest extends BaseIT {
 
         batch.index( indexSCope, user );
         batch.execute().toBlocking().last();
-        ei.refreshAsync().toBlocking().last();
+        ei.refreshAsync().toBlocking().first();
 
         final String query = "where searchUUID = " + searchUUID;
 
@@ -783,7 +783,7 @@ public class EntityIndexTest extends BaseIT {
 
         batch.index( indexSCope, user );
         batch.execute().toBlocking().last();
-        ei.refreshAsync().toBlocking().last();
+        ei.refreshAsync().toBlocking().first();
 
         final String query = "where string = 'I am*'";
 
@@ -840,7 +840,7 @@ public class EntityIndexTest extends BaseIT {
         batch.index( indexSCope, first );
         batch.index( indexSCope, second );
         batch.execute().toBlocking().last();
-        ei.refreshAsync().toBlocking().last();
+        ei.refreshAsync().toBlocking().first();
 
 
         final String ascQuery = "order by string";
@@ -905,7 +905,7 @@ public class EntityIndexTest extends BaseIT {
 
 
         batch.execute().toBlocking().last();
-        ei.refreshAsync().toBlocking().last();
+        ei.refreshAsync().toBlocking().first();
 
 
         final String singleMatchQuery = "string contains 'alpha' OR string 
contains 'foo'";
@@ -986,7 +986,7 @@ public class EntityIndexTest extends BaseIT {
 
 
         batch.execute().toBlocking().last();
-        ei.refreshAsync().toBlocking().last();
+        ei.refreshAsync().toBlocking().first();
 
 
         final String notFirst = "NOT int = 1";
@@ -1096,7 +1096,7 @@ public class EntityIndexTest extends BaseIT {
 
 
         batch.execute().toBlocking().last();
-        ei.refreshAsync().toBlocking().last();
+        ei.refreshAsync().toBlocking().first();
 
 
         final String notFirst = "NOT string = 'I ate a sammich'";

Reply via email to