in memory service fixed

Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/eb823f6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/eb823f6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/eb823f6b

Branch: refs/heads/USERGRID-614
Commit: eb823f6bd9af37bd4403eec68f6015ae0c484eaf
Parents: c22f30a
Author: Shawn Feldman <[email protected]>
Authored: Thu May 7 15:33:24 2015 -0600
Committer: Shawn Feldman <[email protected]>
Committed: Thu May 7 15:33:24 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/InMemoryAsyncEventService.java  |  2 +-
 .../index/AsyncIndexServiceTest.java            | 31 +++++++++++---------
 .../persistence/index/impl/IndexingUtils.java   |  2 +-
 3 files changed, 19 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/eb823f6b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 8574e89..8d0e8c3 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -130,7 +130,7 @@ public class InMemoryAsyncEventService implements 
AsyncEventService {
     public void run( Observable<?> observable ) {
         //start it in the background on an i/o thread
         if ( !resolveSynchronously ) {
-            observable.subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
+            observable.subscribeOn( rxTaskScheduler.getAsyncIOScheduler() 
).subscribe();
         }
         else {
             observable.toBlocking().last();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/eb823f6b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
index 4779877..00c082f 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
@@ -20,8 +20,10 @@
 package org.apache.usergrid.corepersistence.index;
 
 
+import java.util.List;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.index.*;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -39,11 +41,6 @@ import org.apache.usergrid.persistence.core.test.UseModules;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.index.impl.EsRunner;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -84,6 +81,9 @@ public abstract class AsyncIndexServiceTest {
     @Inject
     public EntityIndexFactory entityIndexFactory;
 
+    @Inject
+    public EntityIndex entityIndex;
+
 
     private AsyncEventService asyncEventService;
 
@@ -100,7 +100,7 @@ public abstract class AsyncIndexServiceTest {
     }
 
 
-    @Test( timeout = 60000 )
+    @Test( )
     public void testMessageIndexing() throws InterruptedException {
 
         ApplicationScope applicationScope =
@@ -113,9 +113,9 @@ public abstract class AsyncIndexServiceTest {
 
         //write the entity before indexing
         final EntityCollectionManager collectionManager =
-            entityCollectionManagerFactory.createCollectionManager( 
applicationScope );
+            
entityCollectionManagerFactory.createCollectionManager(applicationScope);
 
-        collectionManager.write( testEntity ).toBlocking().last();
+        collectionManager.write(testEntity).toBlocking().last();
 
         final GraphManager graphManager = 
graphManagerFactory.createEdgeManager( applicationScope );
 
@@ -128,16 +128,19 @@ public abstract class AsyncIndexServiceTest {
         /**
          * Write 10k edges 10 at a time in parallel
          */
-        final Edge connectionSearch = Observable.range( 0, 10000 ).flatMap( 
integer -> {
-            final Id connectingId = createId( "connecting" );
-            final Edge edge = CpNamingUtils.createConnectionEdge( 
connectingId, "likes", testEntity.getId() );
 
-            return graphManager.writeEdge( edge ).subscribeOn( Schedulers.io() 
);
-        }, 10 ).toBlocking().last();
+
+        final List<Edge> connectionSearchEdges = Observable.range( 0, 500 
).flatMap(integer -> {
+            final Id connectingId = createId("connecting");
+            final Edge edge = CpNamingUtils.createConnectionEdge(connectingId, 
"likes", testEntity.getId());
+
+            return graphManager.writeEdge(edge).subscribeOn(Schedulers.io());
+        }).toList().toBlocking().last();
 
 
         asyncEventService.queueEntityIndexUpdate( applicationScope, testEntity 
);
 
+        entityIndex.refreshAsync().toBlocking().last();
 
         //        Thread.sleep( 1000000000000l );
 
@@ -155,7 +158,7 @@ public abstract class AsyncIndexServiceTest {
         assertEquals( testEntity.getId(), collectionResults.get( 0 ).getId() );
 
 
-        final SearchEdge connectionSearchEdge = 
CpNamingUtils.createSearchEdgeFromSource( connectionSearch );
+        final SearchEdge connectionSearchEdge = 
CpNamingUtils.createSearchEdgeFromSource(connectionSearchEdges.get(connectionSearchEdges.size()-1)
 );
 
 
         //query until it's available

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/eb823f6b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
index 4c1e1c0..800d73b 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
@@ -152,7 +152,7 @@ public class IndexingUtils {
         sb.append( version.toString() );
 
         sb.append( FIELD_SEPERATOR );
-        sb.append(idString( searchEdge.getNodeId() ));
+        idString( sb, searchEdge.getNodeId() );
         sb.append( FIELD_SEPERATOR );
         sb.append( searchEdge.getEdgeName() );
         sb.append( FIELD_SEPERATOR );

Reply via email to