in memory service fixed
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/eb823f6b Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/eb823f6b Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/eb823f6b Branch: refs/heads/USERGRID-614 Commit: eb823f6bd9af37bd4403eec68f6015ae0c484eaf Parents: c22f30a Author: Shawn Feldman <[email protected]> Authored: Thu May 7 15:33:24 2015 -0600 Committer: Shawn Feldman <[email protected]> Committed: Thu May 7 15:33:24 2015 -0600 ---------------------------------------------------------------------- .../asyncevents/InMemoryAsyncEventService.java | 2 +- .../index/AsyncIndexServiceTest.java | 31 +++++++++++--------- .../persistence/index/impl/IndexingUtils.java | 2 +- 3 files changed, 19 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/eb823f6b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java index 8574e89..8d0e8c3 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java @@ -130,7 +130,7 @@ public class InMemoryAsyncEventService implements AsyncEventService { public void run( Observable<?> observable ) { //start it in the background on an i/o thread if ( !resolveSynchronously ) { - observable.subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ); + observable.subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); } else { observable.toBlocking().last(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/eb823f6b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java index 4779877..00c082f 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java @@ -20,8 +20,10 @@ package org.apache.usergrid.corepersistence.index; +import java.util.List; import java.util.UUID; +import org.apache.usergrid.persistence.index.*; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -39,11 +41,6 @@ import org.apache.usergrid.persistence.core.test.UseModules; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; -import org.apache.usergrid.persistence.index.ApplicationEntityIndex; -import org.apache.usergrid.persistence.index.CandidateResults; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.index.SearchEdge; -import org.apache.usergrid.persistence.index.SearchTypes; import org.apache.usergrid.persistence.index.impl.EsRunner; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; @@ -84,6 +81,9 @@ public abstract class AsyncIndexServiceTest { @Inject public EntityIndexFactory entityIndexFactory; + @Inject + public EntityIndex entityIndex; + private AsyncEventService asyncEventService; @@ -100,7 +100,7 @@ public abstract class AsyncIndexServiceTest { } - @Test( timeout = 60000 ) + @Test( ) public void testMessageIndexing() throws InterruptedException { ApplicationScope applicationScope = @@ -113,9 +113,9 @@ public abstract class AsyncIndexServiceTest { //write the entity before indexing final EntityCollectionManager collectionManager = - entityCollectionManagerFactory.createCollectionManager( applicationScope ); + entityCollectionManagerFactory.createCollectionManager(applicationScope); - collectionManager.write( testEntity ).toBlocking().last(); + collectionManager.write(testEntity).toBlocking().last(); final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope ); @@ -128,16 +128,19 @@ public abstract class AsyncIndexServiceTest { /** * Write 10k edges 10 at a time in parallel */ - final Edge connectionSearch = Observable.range( 0, 10000 ).flatMap( integer -> { - final Id connectingId = createId( "connecting" ); - final Edge edge = CpNamingUtils.createConnectionEdge( connectingId, "likes", testEntity.getId() ); - return graphManager.writeEdge( edge ).subscribeOn( Schedulers.io() ); - }, 10 ).toBlocking().last(); + + final List<Edge> connectionSearchEdges = Observable.range( 0, 500 ).flatMap(integer -> { + final Id connectingId = createId("connecting"); + final Edge edge = CpNamingUtils.createConnectionEdge(connectingId, "likes", testEntity.getId()); + + return graphManager.writeEdge(edge).subscribeOn(Schedulers.io()); + }).toList().toBlocking().last(); asyncEventService.queueEntityIndexUpdate( applicationScope, testEntity ); + entityIndex.refreshAsync().toBlocking().last(); // Thread.sleep( 1000000000000l ); @@ -155,7 +158,7 @@ public abstract class AsyncIndexServiceTest { assertEquals( testEntity.getId(), collectionResults.get( 0 ).getId() ); - final SearchEdge connectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( connectionSearch ); + final SearchEdge connectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource(connectionSearchEdges.get(connectionSearchEdges.size()-1) ); //query until it's available http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/eb823f6b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java index 4c1e1c0..800d73b 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java @@ -152,7 +152,7 @@ public class IndexingUtils { sb.append( version.toString() ); sb.append( FIELD_SEPERATOR ); - sb.append(idString( searchEdge.getNodeId() )); + idString( sb, searchEdge.getNodeId() ); sb.append( FIELD_SEPERATOR ); sb.append( searchEdge.getEdgeName() ); sb.append( FIELD_SEPERATOR );
