Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-607 [created] 79aea6ab9


Added tests for usergrid 607 and other issues were fixed in previous releases.
Added new method that gets all edge documents out of elastic search and returns 
them.
Added comment to IndexEdge about the differentiation between IndexEdge and 
SearchEdge.
Removed the timeout in EsIndexBufferConsumerImpl.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b52ac2f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b52ac2f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b52ac2f4

Branch: refs/heads/USERGRID-607
Commit: b52ac2f47d918a3390fd2bc65fbd543c2e78067a
Parents: f5cb788
Author: GERey <[email protected]>
Authored: Wed May 13 12:26:58 2015 -0700
Committer: GERey <[email protected]>
Committed: Wed May 13 12:26:58 2015 -0700

----------------------------------------------------------------------
 .../asyncevents/InMemoryAsyncEventService.java  |   1 -
 .../corepersistence/index/IndexServiceImpl.java |  62 +++++-
 .../corepersistence/index/IndexServiceTest.java | 203 ++++++++++++++++++-
 .../index/ApplicationEntityIndex.java           |  12 ++
 .../usergrid/persistence/index/IndexEdge.java   |   6 +
 .../impl/EsApplicationEntityIndexImpl.java      |  53 ++++-
 .../index/impl/EsEntityIndexBatchImpl.java      |   3 +-
 .../index/impl/EsIndexBufferConsumerImpl.java   |   5 +-
 8 files changed, 333 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b52ac2f4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 6faa695..a05057d 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -80,7 +80,6 @@ public class InMemoryAsyncEventService implements 
AsyncEventService {
 
     @Override
     public void queueDeleteEdge( final ApplicationScope applicationScope, 
final Edge edge ) {
-
         run( eventBuilder.queueDeleteEdge( applicationScope, edge ) );
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b52ac2f4/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 8185b4d..d616090 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.corepersistence.index;
 
 
+import java.util.Iterator;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,10 +35,13 @@ import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.CandidateResults;
 import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexEdge;
 import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -163,16 +168,54 @@ public class IndexServiceImpl implements IndexService {
     }
 
 
+    //Steps to delete an IndexEdge.
+    //1.Take the search edge given and search for all the edges in 
elasticsearch matching that search edge
+    //2. Batch Delete all of those edges returned in the previous search.
+    //TODO: optimize loops further.
     @Override
     public Observable<IndexOperationMessage> deleteIndexEdge( final 
ApplicationScope applicationScope,
                                                               final Edge edge 
) {
 
+        final Observable<IndexOperationMessage> batches =
+            Observable.just( edge ).flatMap( edgeValue -> {
+                final ApplicationEntityIndex ei = 
entityIndexFactory.createApplicationEntityIndex( applicationScope );
+                EntityIndexBatch batch = ei.createBatch();
 
-        //TODO, query ES and remove this edge
 
-        throw new NotImplementedException( "Implement me" );
-    }
+                //review why generating the Scope from the Source  and the 
target node makes sense.
+                final IndexEdge fromSource = generateScopeFromSource( edge );
+                final Id targetId = edge.getTargetNode();
+
+
+                CandidateResults targetEdgesToBeDeindexed = 
ei.getAllEdgeDocuments( fromSource, targetId, 1000, 0 );
+
+                //Should loop thorugh and query for all documents and if there 
are no documents then the loop should exit.
+                do{
+                    batch = deindexBatchIteratorResolver( fromSource, 
targetEdgesToBeDeindexed, batch );
+                    if(!targetEdgesToBeDeindexed.getOffset().isPresent())
+                        break;
+                    targetEdgesToBeDeindexed = ei.getAllEdgeDocuments( 
fromSource, targetId, 1000, targetEdgesToBeDeindexed.getOffset().get() );
+                }while(!targetEdgesToBeDeindexed.isEmpty());
+
+
+
+                final IndexEdge fromTarget = generateScopeFromTarget( edge );
+                final Id sourceId = edge.getSourceNode();
+
+                CandidateResults sourceEdgesToBeDeindexed = 
ei.getAllEdgeDocuments( fromTarget, sourceId, 1000, 0 );
 
+                do{
+                    batch = deindexBatchIteratorResolver( fromTarget, 
sourceEdgesToBeDeindexed, batch );
+                    if(!sourceEdgesToBeDeindexed.getOffset().isPresent())
+                        break;
+                    sourceEdgesToBeDeindexed = ei.getAllEdgeDocuments( 
fromTarget, sourceId, 1000, sourceEdgesToBeDeindexed.getOffset().get()  );
+                }while(!sourceEdgesToBeDeindexed.isEmpty());
+
+                return batch.execute();
+            } );
+
+        return ObservableTimer.time( batches, addTimer );
+    }
 
     @Override
     public Observable<IndexOperationMessage> deleteEntityIndexes( final 
ApplicationScope applicationScope,
@@ -222,6 +265,19 @@ public class IndexServiceImpl implements IndexService {
                               .map( edge -> generateScopeFromTarget( edge ) );
     }
 
+    /**
+     * Takes in candidate results and uses the iterator to create batch 
commands
+     */
+
+    public EntityIndexBatch deindexBatchIteratorResolver(IndexEdge 
edge,CandidateResults edgesToBeDeindexed, EntityIndexBatch batch){
+        Iterator itr = edgesToBeDeindexed.iterator();
+        while( itr.hasNext() ) {
+            CandidateResult cr = ( CandidateResult ) itr.next();
+            batch.deindex( edge, cr.getId(), cr.getVersion() );
+        }
+        return batch;
+    }
+
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b52ac2f4/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
index 5570df1..fffe6a2 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.corepersistence.index;
 
 
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
@@ -29,6 +31,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import org.apache.usergrid.corepersistence.TestIndexModule;
+import 
org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.Candidate;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
@@ -173,9 +176,9 @@ public class IndexServiceTest {
         final ApplicationEntityIndex applicationEntityIndex =
             entityIndexFactory.createApplicationEntityIndex( applicationScope 
);
 
+        //query until the collection edge is available
         final SearchEdge collectionSearchEdge = 
CpNamingUtils.createSearchEdgeFromSource( collectionEdge );
 
-        //query until it's available
         final CandidateResults collectionResults = getResults( 
applicationEntityIndex, collectionSearchEdge,
             SearchTypes.fromTypes( testEntity.getId().getType() ), 1);
 
@@ -184,10 +187,9 @@ public class IndexServiceTest {
         assertEquals( testEntity.getId(), collectionResults.get( 0 ).getId() );
 
 
+        //query until the connection edge is available
         final SearchEdge connectionSearchEdge = 
CpNamingUtils.createSearchEdgeFromSource( connectionSearch );
 
-
-        //query until it's available
         final CandidateResults connectionResults = getResults( 
applicationEntityIndex, connectionSearchEdge,
             SearchTypes.fromTypes( testEntity.getId().getType() ), 1 );
 
@@ -247,7 +249,7 @@ public class IndexServiceTest {
         //get the first and last edge
         final Edge connectionSearch = connectionSearchEdges.get( 0 );
 
-        final Edge lastSearch = connectionSearchEdges.get( edgeCount-1 );
+        final Edge lastSearch = connectionSearchEdges.get( edgeCount - 1 );
 
 
         //now index
@@ -298,6 +300,199 @@ public class IndexServiceTest {
     }
 
 
+
+
+    /**
+     *This test must do the following steps.
+     *1. Delete the connecting edge
+     *2. Run the deleteIndexEdge using the search edge that gets returned from 
the delete call
+     *3. Run queries to make sure that the collection entity still exists 
while the connection search edge is gone.
+     * @throws InterruptedException
+     */
+    @Test
+    public void testDeleteSingleConnectingEdge() throws InterruptedException {
+        ApplicationScope applicationScope =
+            new ApplicationScopeImpl( new SimpleId( UUID.randomUUID(), 
"application" ) );
+
+        final ApplicationEntityIndex applicationEntityIndex =
+            entityIndexFactory.createApplicationEntityIndex( applicationScope 
);
+
+        final GraphManager graphManager = 
graphManagerFactory.createEdgeManager( applicationScope );
+
+        final Entity testEntity = new Entity( createId( "thing" ), 
UUIDGenerator.newTimeUUID() );
+        testEntity.setField( new StringField( "string", "foo" ) );
+
+        //write entity
+        final Edge connectionSearch =
+            createTestEntityAndReturnConnectionEdge( 
applicationScope,graphManager,testEntity );
+
+
+        final SearchEdge connectionSearchEdge = 
CpNamingUtils.createSearchEdgeFromSource( connectionSearch );
+
+        //step 1
+        //(We need to mark then delete things in the graph manager.)
+        final Edge toBeMarkedEdge = graphManager.markEdge( connectionSearch 
).toBlocking().firstOrDefault( null );
+        final Edge toBeDeletedEdge = graphManager.deleteEdge( toBeMarkedEdge 
).toBlocking().firstOrDefault( null );
+
+        //step 2
+        IndexOperationMessage indexOperationMessage =
+            indexService.deleteIndexEdge( applicationScope, toBeDeletedEdge 
).toBlocking().lastOrDefault(
+            null );
+
+        assertEquals( 1, indexOperationMessage.getDeIndexRequests().size() );
+
+        //ensure that no edges remain
+        final CandidateResults connectionResultsEmpty = 
applicationEntityIndex.search( connectionSearchEdge,
+            SearchTypes.fromTypes( "things" ),"select *",10,0 );
+
+        assertEquals(0,connectionResultsEmpty.size());
+
+    }
+
+    @Test
+    public void testDeleteMultipleConnectingEdges() throws 
InterruptedException {
+        ApplicationScope applicationScope =
+            new ApplicationScopeImpl( new SimpleId( UUID.randomUUID(), 
"application" ) );
+
+        final ApplicationEntityIndex applicationEntityIndex =
+            entityIndexFactory.createApplicationEntityIndex( applicationScope 
);
+
+        final GraphManager graphManager = 
graphManagerFactory.createEdgeManager( applicationScope );
+
+        final Entity testEntity = new Entity( createId( "thing" ), 
UUIDGenerator.newTimeUUID() );
+        testEntity.setField( new StringField( "string", "foo" ) );
+
+
+        //write entity
+        Edge collectionEdge = createEntityandCollectionEdge( applicationScope, 
graphManager, testEntity );
+        //Write multiple connection edges
+        final int edgeCount = 5;
+
+        final List<Edge> connectionSearchEdges = createConnectionSearchEdges( 
testEntity, graphManager, edgeCount );
+
+        indexService.indexEntity( applicationScope, testEntity 
).toBlocking().getIterator();
+
+        //query until results are available for collections
+        final SearchEdge collectionSearchEdge = 
CpNamingUtils.createSearchEdgeFromSource( collectionEdge );
+        getResults( applicationEntityIndex, collectionSearchEdge,
+            SearchTypes.fromTypes( testEntity.getId().getType() ), 1 );
+
+        for(int i = 0; i < edgeCount; i++) {
+            //query until results are available for connections
+
+            final SearchEdge connectionSearchEdge = 
CpNamingUtils.createSearchEdgeFromSource( connectionSearchEdges.get( i ) );
+            getResults( applicationEntityIndex, connectionSearchEdge, 
SearchTypes.fromTypes( testEntity.getId().getType() ),
+                 1 );
+        }
+
+        for(Edge connectionSearch:connectionSearchEdges) {
+            //step 1
+            final Edge toBeMarkedEdge = graphManager.markEdge( 
connectionSearch ).toBlocking().firstOrDefault( null );
+            final Edge toBeDeletedEdge = graphManager.deleteEdge( 
toBeMarkedEdge ).toBlocking().first();
+
+            final SearchEdge connectionSearchEdge = 
CpNamingUtils.createSearchEdgeFromSource( connectionSearch );
+
+            //step 2
+            IndexOperationMessage indexOperationMessage =
+                indexService.deleteIndexEdge( applicationScope, 
toBeDeletedEdge ).toBlocking().lastOrDefault( null );
+
+            //not sure if this is still valid.
+            assertEquals( 1, indexOperationMessage.getDeIndexRequests().size() 
);
+
+            //ensure that no edges remain
+            final CandidateResults connectionResultsEmpty = 
applicationEntityIndex.search( connectionSearchEdge,
+                SearchTypes.fromTypes( "things" ),"select *",10,0 );
+
+            assertEquals(0,connectionResultsEmpty.size());
+        }
+    }
+
+
+    /**
+     * Refactor into two methods . Should only have one responsiblitiy.
+     * @param applicationScope
+     * @param graphManager
+     * @return
+     */
+    private Edge createTestEntityAndReturnConnectionEdge( final 
ApplicationScope applicationScope,
+                                                          final GraphManager 
graphManager,
+                                                          final Entity 
testEntity) {
+        final EntityCollectionManager collectionManager =
+            entityCollectionManagerFactory.createCollectionManager( 
applicationScope );
+
+        final ApplicationEntityIndex applicationEntityIndex =
+            entityIndexFactory.createApplicationEntityIndex( applicationScope 
);
+
+        final Edge collectionEdge =
+            createEntityandCollectionEdge( applicationScope, graphManager, 
testEntity );
+
+
+        //create our connection edge.
+        final Id connectingId = createId( "connecting" );
+        final Edge connectionEdge = CpNamingUtils.createConnectionEdge( 
connectingId, "likes", testEntity.getId() );
+
+        final Edge connectionSearch = graphManager.writeEdge( connectionEdge 
).toBlocking().last();
+
+        //now index
+        indexService.indexEntity( applicationScope, testEntity 
).count().toBlocking().last();
+
+        //query until results are available for collections
+        final SearchEdge collectionSearchEdge = 
CpNamingUtils.createSearchEdgeFromSource( collectionEdge );
+        getResults( applicationEntityIndex, collectionSearchEdge, 
SearchTypes.fromTypes( testEntity.getId().getType() ),
+            1 );
+
+        //query until results are available for connections
+        final SearchEdge connectionSearchEdge = 
CpNamingUtils.createSearchEdgeFromSource( connectionSearch );
+        getResults( applicationEntityIndex, connectionSearchEdge, 
SearchTypes.fromTypes( testEntity.getId().getType() ),
+            1 );
+
+        return connectionSearch;
+    }
+
+
+    /**
+     * Creates an entity along with the corresponding collection edge.
+     * @param applicationScope
+     * @param graphManager
+     * @param testEntity
+     * @return
+     */
+    private Edge createEntityandCollectionEdge( final ApplicationScope 
applicationScope,
+                                                final GraphManager 
graphManager, final Entity testEntity) {
+
+        final EntityCollectionManager collectionManager =
+            entityCollectionManagerFactory.createCollectionManager( 
applicationScope );
+
+        collectionManager.write( testEntity ).toBlocking().last();
+
+        //create our collection edge
+        final Edge collectionEdge =
+            CpNamingUtils.createCollectionEdge( 
applicationScope.getApplication(), testEntity.getId().getType(),
+                testEntity.getId() );
+
+        graphManager.writeEdge( collectionEdge ).toBlocking().last();
+        return collectionEdge;
+    }
+
+
+    private List<Edge> createConnectionSearchEdges(
+        final Entity testEntity, final GraphManager graphManager, final int 
edgeCount ) {
+
+        final List<Edge> connectionSearchEdges = Observable.range( 0, 
edgeCount ).flatMap( integer -> {
+
+            //create our connection edge.
+            final Id connectingId = createId( "connecting" );
+            final Edge connectionEdge = CpNamingUtils.createConnectionEdge( 
connectingId, "likes", testEntity.getId() );
+
+            return graphManager.writeEdge( connectionEdge ).subscribeOn( 
Schedulers.io() );
+        }, 20).toList().toBlocking().last();
+
+
+        assertEquals( "All edges saved", edgeCount, 
connectionSearchEdges.size() );
+        return connectionSearchEdges;
+    }
+
+
     private CandidateResults getResults( final ApplicationEntityIndex 
applicationEntityIndex,
                                          final SearchEdge searchEdge, final 
SearchTypes searchTypes,
                                          final int expectedSize ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b52ac2f4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
index f4f7bcd..20c3f12 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.persistence.index;
 
 
+import org.apache.usergrid.persistence.model.entity.Id;
+
 import rx.Observable;
 
 /**
@@ -45,6 +47,16 @@ public interface ApplicationEntityIndex {
     CandidateResults search( final SearchEdge searchEdge, final SearchTypes 
searchTypes, final String query,
                              final int limit, final int offset );
 
+
+    /**
+     * Same as search, just iterates all documents that match the index edge 
exactly
+     * @param edge
+     * @param limit
+     * @param offset
+     * @return
+     */
+    CandidateResults getAllEdgeDocuments(final IndexEdge edge, final Id 
entityId,  final int limit, final int offset);
+
     /**
      * delete all application records
      * @return

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b52ac2f4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexEdge.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexEdge.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexEdge.java
index 145ccba..36aa240 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexEdge.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexEdge.java
@@ -25,6 +25,12 @@ package org.apache.usergrid.persistence.index;
 /**
  * An edge to perform indexing on.
  */
+/**
+ *source node - edge - target node =>
+ IndexEdge => sourceNode, edgeType, timestamp, NodeType we're indexing (TARGET)
+ SearchEdge = > sourceNode, edgeType, NodeType (Target)
+
+ */
 public interface IndexEdge extends SearchEdge {
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b52ac2f4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index 5b67060..f004ce8 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -30,6 +30,8 @@ import 
org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
 import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.FilterBuilders;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.TermQueryBuilder;
@@ -46,16 +48,19 @@ import 
org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.apache.usergrid.persistence.index.CandidateResult;
 import org.apache.usergrid.persistence.index.CandidateResults;
 import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.IndexEdge;
 import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.SearchEdge;
 import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.index.query.ParsedQuery;
 import org.apache.usergrid.persistence.index.query.ParsedQueryBuilder;
+import org.apache.usergrid.persistence.index.query.tree.QueryVisitor;
 import org.apache.usergrid.persistence.index.utils.IndexValidationUtils;
 import org.apache.usergrid.persistence.map.MapManager;
 import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.map.MapScope;
 import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
@@ -148,7 +153,8 @@ public class EsApplicationEntityIndexImpl implements 
ApplicationEntityIndex {
 
         final ParsedQuery parsedQuery = ParsedQueryBuilder.build( query );
 
-        final SearchRequestBuilder srb = searchRequest.getBuilder( searchEdge, 
searchTypes, parsedQuery, limit, offset );
+        final SearchRequestBuilder srb = searchRequest.getBuilder( searchEdge, 
searchTypes, parsedQuery, limit, offset )
+                                                      .setTimeout( 
TimeValue.timeValueMillis(queryTimeout) );
 
         if ( logger.isDebugEnabled() ) {
             logger.debug( "Searching index (read alias): {}\n  nodeId: {}, 
edgeType: {},  \n type: {}\n   query: {} ",
@@ -159,7 +165,50 @@ public class EsApplicationEntityIndexImpl implements 
ApplicationEntityIndex {
         try {
             //Added For Graphite Metrics
             Timer.Context timeSearch = searchTimer.time();
-            searchResponse = srb.execute().actionGet(queryTimeout);
+            searchResponse = srb.execute().actionGet();
+            timeSearch.stop();
+        }
+        catch ( Throwable t ) {
+            logger.error( "Unable to communicate with Elasticsearch", t );
+            failureMonitor.fail( "Unable to execute batch", t );
+            throw t;
+        }
+        failureMonitor.success();
+
+        return parseResults(searchResponse, parsedQuery, limit, offset);
+    }
+
+
+    @Override
+    public CandidateResults getAllEdgeDocuments( final IndexEdge edge, final 
Id entityId, final int limit,
+                                                 final int offset ) {
+        /**
+         * Take a list of IndexEdge, with an entityId
+         and query Es directly for matches
+
+         */
+        IndexValidationUtils.validateSearchEdge( edge );
+        Preconditions.checkNotNull( entityId, "entityId cannot be null" );
+        Preconditions.checkArgument( limit > 0, "limit must be > 0" );
+
+        SearchResponse searchResponse;
+
+        final ParsedQuery parsedQuery = ParsedQueryBuilder.build( "select *" );
+        FilterBuilders.idsFilter( entityId.getType() );
+
+        final SearchRequestBuilder srb = searchRequest.getBuilder( edge, 
SearchTypes.fromTypes( entityId.getType() ),
+            parsedQuery, limit, offset ).setTimeout( 
TimeValue.timeValueMillis( queryTimeout ) );
+
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "Searching for edge index (read alias): {}\n  
nodeId: {}, edgeType: {},  \n type: {}\n   query: {} ",
+                this.alias.getReadAlias(), edge.getNodeId(), 
edge.getEdgeName(),
+                SearchTypes.fromTypes( entityId.getType()), srb );
+        }
+
+        try {
+            //Added For Graphite Metrics
+            Timer.Context timeSearch = searchTimer.time();
+            searchResponse = srb.execute().actionGet();
             timeSearch.stop();
         }
         catch ( Throwable t ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b52ac2f4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 956f1d5..e50ee73 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -130,7 +130,8 @@ public class EsEntityIndexBatchImpl implements 
EntityIndexBatch {
         IndexOperationMessage tempContainer = container;
         container = new IndexOperationMessage();
 
-        return indexBatchBufferProducer.put( tempContainer );
+        Observable observable = indexBatchBufferProducer.put( tempContainer );
+        return  observable;
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b52ac2f4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 469893e..5a71444 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -170,6 +170,9 @@ public class EsIndexBufferConsumerImpl implements 
IndexBufferConsumer {
 
             final Observable<IndexOperation> index = Observable.from( 
batch.getIndexRequests() );
             final Observable<DeIndexOperation> deIndex = Observable.from( 
batch.getDeIndexRequests() );
+            if(indexOperationSetSize +  deIndexOperationSetSize > 0){
+                batch.done();
+            }
 
             return Observable.merge( index, deIndex );
         } );
@@ -230,7 +233,7 @@ public class EsIndexBufferConsumerImpl implements 
IndexBufferConsumer {
 
 
         try {
-            responses = bulkRequest.execute().actionGet( 
indexFig.getWriteTimeout() );
+            responses = bulkRequest.execute().actionGet( );
         }
         catch ( Throwable t ) {
             log.error( "Unable to communicate with elasticsearch" );

Reply via email to