Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev f2aa40325 -> 28a8590cd


Fixes index runtime execution for in memory to test


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5f99ee2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5f99ee2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5f99ee2d

Branch: refs/heads/two-dot-o-dev
Commit: 5f99ee2de640093335a73ffcfce3e2b07e8e146c
Parents: 3480a36
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Apr 21 14:12:34 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Apr 21 14:12:34 2015 -0600

----------------------------------------------------------------------
 .../index/AsyncIndexProvider.java               |   5 +-
 .../index/InMemoryAsyncReIndexService.java      |  43 +++++--
 .../corepersistence/index/IndexService.java     |  12 +-
 .../corepersistence/index/IndexServiceImpl.java |  55 ++++++---
 .../corepersistence/TestIndexModule.java        |  14 ++-
 .../corepersistence/index/IndexServiceTest.java | 112 +++++++++++++++++++
 .../corepersistence/index/PublishRxtest.java    |  39 ++++++-
 .../usergrid/persistence/CollectionIT.java      |   1 -
 .../core/metrics/ObservableTimer.java           |  72 ++++++++++++
 .../core/rx/RxTaskSchedulerImpl.java            |   5 +-
 .../persistence/model/entity/Entity.java        |   2 +-
 .../persistence/index/impl/IndexRequest.java    |   1 -
 .../usergrid/cassandra/SpringResource.java      |  13 ++-
 13 files changed, 321 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
index 2c48c13..18df824 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java
@@ -43,7 +43,6 @@ public class AsyncIndexProvider implements 
Provider<AsyncReIndexService> {
     private final MetricsFactory metricsFactory;
     private final IndexService indexService;
     private final RxTaskScheduler rxTaskScheduler;
-    private final AllEntityIdsObservable allEntitiesObservable;
     private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
 
     private AsyncReIndexService asyncIndexService;
@@ -53,14 +52,12 @@ public class AsyncIndexProvider implements 
Provider<AsyncReIndexService> {
     public AsyncIndexProvider( final QueryFig queryFig, final 
QueueManagerFactory queueManagerFactory,
                                final MetricsFactory metricsFactory, final 
IndexService indexService,
                                final RxTaskScheduler rxTaskScheduler,
-                               final AllEntityIdsObservable 
allEntitiesObservable,
                                final EntityCollectionManagerFactory 
entityCollectionManagerFactory ) {
         this.queryFig = queryFig;
         this.queueManagerFactory = queueManagerFactory;
         this.metricsFactory = metricsFactory;
         this.indexService = indexService;
         this.rxTaskScheduler = rxTaskScheduler;
-        this.allEntitiesObservable = allEntitiesObservable;
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
     }
 
@@ -85,7 +82,7 @@ public class AsyncIndexProvider implements 
Provider<AsyncReIndexService> {
         switch ( impl ) {
             case LOCAL:
                 return new InMemoryAsyncReIndexService( indexService, 
rxTaskScheduler,
-                    entityCollectionManagerFactory );
+                    entityCollectionManagerFactory, metricsFactory );
             case SQS:
                 return new SQSAsyncReIndexService( queueManagerFactory, 
queryFig, metricsFactory );
             default:

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java
index 5ebda87..4908945 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java
@@ -25,11 +25,14 @@ import org.slf4j.LoggerFactory;
 
 import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import 
org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
@@ -39,33 +42,49 @@ import rx.Observable;
 @Singleton
 public class InMemoryAsyncReIndexService implements AsyncReIndexService {
 
-    private static final Logger log = 
LoggerFactory.getLogger(InMemoryAsyncReIndexService.class);
+    private static final Logger log = LoggerFactory.getLogger( 
InMemoryAsyncReIndexService.class );
     private final IndexService indexService;
     private final RxTaskScheduler rxTaskScheduler;
     private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
+    private final Timer timer;
 
 
     @Inject
     public InMemoryAsyncReIndexService( final IndexService indexService, final 
RxTaskScheduler rxTaskScheduler,
-                                        final EntityCollectionManagerFactory 
entityCollectionManagerFactory ) {
+                                        final EntityCollectionManagerFactory 
entityCollectionManagerFactory, final
+                                        MetricsFactory metricsFactory ) {
         this.indexService = indexService;
         this.rxTaskScheduler = rxTaskScheduler;
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+
+        timer = metricsFactory.getTimer( InMemoryAsyncReIndexService.class, 
"IndexTimer" );
     }
 
 
     @Override
-    public void queueEntityIndexUpdate( final ApplicationScope 
applicationScope, final Entity toIndex ) {
+    public void queueEntityIndexUpdate( final ApplicationScope 
applicationScope, final Entity entity ) {
 
         //process the entity immediately
         //only process the same version, otherwise ignore
 
-        Observable.just( toIndex ).doOnNext( entity -> {
-            log.debug( "Indexing entity {} in app scope {} ", entity, 
applicationScope );
-            indexService.indexEntity( applicationScope, entity );
-        } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
-    }
 
+        log.debug( "Indexing entity {} in app scope {} ", entity, 
applicationScope );
+
+        final Observable<IndexOperationMessage> edgeObservable = 
indexService.indexEntity( applicationScope, entity );
+
+
+
+        edgeObservable.subscribeOn( rxTaskScheduler.getAsyncIOScheduler() 
).subscribe();
+
+         //now start it
+//        final Timer.Context time = timer.time();
+//
+//        edgeObservable.connect();
+//
+//        time.stop();
+
+
+    }
 
 
     @Override
@@ -75,12 +94,12 @@ public class InMemoryAsyncReIndexService implements 
AsyncReIndexService {
 
         final Id entityId = entityIdScope.getId();
 
-        final Entity
-            entity = entityCollectionManagerFactory.createCollectionManager( 
applicationScope ).load(
-            entityId ).toBlocking().lastOrDefault( null );
+        final Entity entity =
+            entityCollectionManagerFactory.createCollectionManager( 
applicationScope ).load( entityId ).toBlocking()
+                                          .lastOrDefault( null );
 
 
-        if(entity == null){
+        if ( entity == null ) {
             log.warn( "Could not find entity with id {} in app scope {} ", 
entityId, applicationScope );
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
index 41fe323..5e9392b 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
@@ -20,13 +20,13 @@
 package org.apache.usergrid.corepersistence.index;
 
 
-import java.util.UUID;
-
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.IndexEdge;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
 
 import rx.Observable;
+import rx.observables.ConnectableObservable;
 
 
 /**
@@ -41,11 +41,11 @@ public interface IndexService {
      * @param applicationScope The scope of the entity
      * @param entity The entity
      *
-     * @return An observable with the count of every batch executed to index 
the entity.  Note that this a cold observable
-     * and must be subscribed to in order to perform the operation.  This also 
makes no assumptions on scheduling.  It is up to the caller
+     * @return A ConnectableObservable with every edge in the batch to index 
the entity.  Note that this a cold observable
+     * and must be subscribed to, then "connect" in order to perform the 
operation.  This also makes no assumptions on scheduling.  It is up to the 
caller
      * to assign the correct scheduler to the observable
      */
-    Observable<Long> indexEntity( final ApplicationScope applicationScope, 
final Entity entity );
+    Observable<IndexOperationMessage> indexEntity( final ApplicationScope 
applicationScope, final Entity entity );
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 29dd04a..87e22f4 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -20,8 +20,11 @@
 package org.apache.usergrid.corepersistence.index;
 
 
-import java.util.Collection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.entities.Application;
 import org.apache.usergrid.persistence.graph.Edge;
@@ -38,6 +41,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.schema.CollectionInfo;
 import org.apache.usergrid.utils.InflectionUtils;
 
+import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
@@ -55,26 +59,30 @@ import static 
org.apache.usergrid.persistence.Schema.getDefaultSchema;
 @Singleton
 public class IndexServiceImpl implements IndexService {
 
+
+    private static final Logger logger = LoggerFactory.getLogger( 
IndexServiceImpl.class );
+
     private final GraphManagerFactory graphManagerFactory;
     private final EntityIndexFactory entityIndexFactory;
     private final EdgesObservable edgesObservable;
     private final IndexFig indexFig;
+    private final Timer indexTimer;
 
 
     @Inject
     public IndexServiceImpl( final GraphManagerFactory graphManagerFactory, 
final EntityIndexFactory entityIndexFactory,
-                             final EdgesObservable edgesObservable, IndexFig 
indexFig ) {
+                             final EdgesObservable edgesObservable, final 
IndexFig indexFig, final MetricsFactory metricsFactory ) {
         this.graphManagerFactory = graphManagerFactory;
         this.entityIndexFactory = entityIndexFactory;
         this.edgesObservable = edgesObservable;
         this.indexFig = indexFig;
+        this.indexTimer = metricsFactory.getTimer( IndexServiceImpl.class, 
"index.process");
     }
 
 
     @Override
-    public Observable<Long> indexEntity( final ApplicationScope 
applicationScope, final Entity entity ) {
-
-
+    public Observable<IndexOperationMessage> indexEntity( final 
ApplicationScope applicationScope,
+                                                          final Entity entity 
) {
         //bootstrap the lower modules from their caches
         final GraphManager gm = graphManagerFactory.createEdgeManager( 
applicationScope );
         final ApplicationEntityIndex ei = 
entityIndexFactory.createApplicationEntityIndex( applicationScope );
@@ -91,30 +99,37 @@ public class IndexServiceImpl implements IndexService {
 
 
         //we might or might not need to index from target-> source
+        final Observable<IndexEdge> targetSizes = getIndexEdgesToTarget( gm, 
entityId );
 
 
-        final Observable<IndexEdge> targetSizes = getIndexEdgesToTarget( gm, 
entityId );
+        //merge the edges together
+        final Observable<IndexEdge> observable = Observable.merge( 
sourceEdgesToIndex, targetSizes);
+        //do our observable for batching
+        //try to send a whole batch if we can
 
 
-        //start the observable via publish
-        final ConnectableObservable<IndexOperationMessage> observable =
-            //try to send a whole batch if we can
-            Observable.merge( sourceEdgesToIndex, targetSizes ).buffer( 
indexFig.getIndexBatchSize() )
+        //do our observable for batching
+        //try to send a whole batch if we can
+        final Observable<IndexOperationMessage>  batches =  observable.buffer( 
indexFig.getIndexBatchSize() )
 
-                //map into batches based on our buffer size
-                .flatMap( buffer -> Observable.from( buffer ).collect( () -> 
ei.createBatch(),
-                    ( batch, indexEdge ) -> batch.index( indexEdge, entity ) )
+            //map into batches based on our buffer size
+            .flatMap( buffer -> Observable.from( buffer )
+                //collect results into a single batch
+                .collect( () -> ei.createBatch(), ( batch, indexEdge ) -> {
+                    logger.debug( "adding edge {} to batch for entity {}", 
indexEdge, entity );
+                    batch.index( indexEdge, entity );
+                } )
                     //return the future from the batch execution
-                    .flatMap( batch -> Observable.from( batch.execute() ) ) 
).publish();
+                .flatMap( batch -> Observable.from( batch.execute() ) ) );
 
+        return ObservableTimer.time( batches, indexTimer );
+    }
 
 
-        return observable.countLong();
-    }
 
 
     /**
-     * Get index edgs to the target
+     * Get index edges to the target
      *
      * @param graphManager The graph manager
      * @param entityId The entitie's id
@@ -147,6 +162,10 @@ public class IndexServiceImpl implements IndexService {
          *
          * we're indexing from target->source here
          */
-        return edgesObservable.getEdgesFromSource( graphManager, entityId, 
linkedCollection ).map( edge -> generateScopeToTarget( edge ) );
+        return edgesObservable.getEdgesFromSource( graphManager, entityId, 
linkedCollection )
+                              .map( edge -> generateScopeToTarget( edge ) );
     }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java
index e5254d1..95000bf 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java
@@ -19,17 +19,25 @@
 package org.apache.usergrid.corepersistence;
 
 
+import org.springframework.context.ApplicationContext;
+
+import org.apache.usergrid.persistence.PersistenceModule;
 import org.apache.usergrid.persistence.core.guice.TestModule;
+import org.apache.usergrid.setup.ConcurrentProcessSingleton;
 
 
 public class TestIndexModule extends TestModule {
 
+
     @Override
     protected void configure() {
 
-        //this will break, we need to untagle this and move to guice in core 
completely
-        install( new CoreModule( ));
-    }
 
+        //TODO, refactor to guice to get rid of this
+        final ApplicationContext singleton = 
ConcurrentProcessSingleton.getInstance().getSpringResource().getAppContext();
 
+        //this will break, we need to untagle this and move to guice in core 
completely
+        install( new CoreModule() );
+        install( new PersistenceModule( singleton ) );
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
new file mode 100644
index 0000000..b0f3995
--- /dev/null
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.corepersistence.TestIndexModule;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.index.IndexEdge;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.index.impl.EsRunner;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.index.impl.IndexRequest;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.field.StringField;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionEdge;
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicationScope;
+import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
+import static org.junit.Assert.*;
+
+
+@RunWith(EsRunner.class)
+@UseModules({ TestIndexModule.class })
+public class IndexServiceTest {
+
+    @Inject
+    public IndexService indexService;
+
+
+    @Inject
+    public GraphManagerFactory graphManagerFactory;
+
+    public GraphManager graphManager;
+
+    public ApplicationScope applicationScope;
+
+    @Before
+    public void setup(){
+        applicationScope = getApplicationScope( UUIDGenerator.newTimeUUID());
+
+        graphManager = graphManagerFactory.createEdgeManager( applicationScope 
);
+    }
+
+
+    @Test
+    public void testSingleIndexFromSource(){
+        final Entity entity = new Entity( createId( "test" ), 
UUIDGenerator.newTimeUUID());
+        entity.setField( new StringField( "string", "foo" ) );
+
+        final Edge collectionEdge =  createCollectionEdge( 
applicationScope.getApplication(), "tests", entity.getId() );
+
+        //write the edge
+        graphManager.writeEdge( collectionEdge ).toBlocking().last();
+
+
+        //index the edge
+        final Observable<IndexOperationMessage> indexed = 
indexService.indexEntity( applicationScope, entity );
+
+
+        //real users should never call to blocking, we're not sure what we'll 
get
+        final IndexOperationMessage results =  indexed.toBlocking().last();
+
+        final Set<IndexRequest> indexRequests = results.getIndexRequests();
+
+        //ensure our value made it to the index request
+        final IndexRequest indexRequest = indexRequests.iterator().next();
+
+        assertNotNull(indexRequest);
+
+
+
+//        assertEquals(applicationScope.getApplication(), indexRequest.);
+//        assertEquals(collectionEdge.getTimestamp(), edge.getTimestamp());
+//        assertEquals(collectionEdge.getType(), edge.getEdgeName());
+//        assertEquals( SearchEdge.NodeType.TARGET, edge.getNodeType());
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxtest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxtest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxtest.java
index 1eb00c2..84d3ef7 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxtest.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxtest.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.corepersistence.index;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.junit.Ignore;
 import org.junit.Test;
 
 import rx.Observable;
@@ -30,6 +31,7 @@ import rx.Subscription;
 import rx.observables.ConnectableObservable;
 import rx.schedulers.Schedulers;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 
@@ -54,8 +56,41 @@ public class PublishRxTest {
 
         assertTrue( "publish1 behaves as expected", completed );
 
-        final boolean completedSubscription =  
connectedObservable.isUnsubscribed();;
+        final boolean completedSubscription = 
connectedObservable.isUnsubscribed();
+        ;
 
-        assertTrue("Subscription complete", completedSubscription);
+        assertTrue( "Subscription complete", completedSubscription );
+    }
+
+
+    @Test
+    @Ignore("This seems like it should work, yet blocks forever")
+    public void testConnectableObserver() throws InterruptedException {
+
+        final int count = 10;
+
+        final CountDownLatch latch = new CountDownLatch( count );
+
+        final ConnectableObservable<Integer> connectedObservable = 
Observable.range( 0, count ).publish();
+
+
+        //connect to our latch, which should run on it's own subscription
+        //start our latch running
+        connectedObservable.doOnNext( integer -> latch.countDown() 
).subscribeOn( Schedulers.io() ).subscribe();
+
+
+        final Observable<Integer> countObservable = 
connectedObservable.subscribeOn( Schedulers.io() ).count();
+
+        //start the sequence
+        connectedObservable.connect();
+
+
+        final boolean completed = latch.await( 5, TimeUnit.SECONDS );
+
+        assertTrue( "publish1 behaves as expected", completed );
+
+        final int returnedCount = countObservable.toBlocking().last();
+
+        assertEquals( "Counts the same", count, returnedCount );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java 
b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
index d1f746a..fcdc11f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
@@ -314,7 +314,6 @@ public class CollectionIT extends AbstractCoreIT {
         assertNotNull( user );
 
         app.refreshIndex();
-        Thread.sleep( 100 );
 
         // EntityRef
         final Query query = Query.fromQL( "lastname = '" + lastName + "'" );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/ObservableTimer.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/ObservableTimer.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/ObservableTimer.java
new file mode 100644
index 0000000..c8ffeb8
--- /dev/null
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/ObservableTimer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.metrics;
+
+
+import com.codahale.metrics.Timer;
+
+import rx.Observable;
+
+
+/**
+ * A wrapper class that will allows timing around an observable.  Simply pass 
an observable and we will set a start and
+ * stop issue.
+ */
+public class ObservableTimer {
+
+
+    private final Timer timer;
+    private Timer.Context context;
+
+
+    /**
+     * Intentionally private, use the factory
+     *
+     * @param timer
+     */
+    private ObservableTimer( final Timer timer ) {this.timer = timer;}
+
+
+    /**
+     * Start the timer
+     */
+    public void start() {
+        context = timer.time();
+    }
+
+
+    /**
+     * Stop the timer
+     */
+    public void stop() {
+        context.stop();
+    }
+
+
+    /**
+     * Time the obserable with the specified timer
+     */
+    public static <T> Observable<T> time( final Observable<T> observable, 
final Timer timer ) {
+        final ObservableTimer proxy = new ObservableTimer( timer );
+
+        //attach to the observable
+        return observable.doOnSubscribe( () -> proxy.start() ).doOnCompleted( 
() -> proxy.stop() );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
index 311f7f4..f2950f4 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
@@ -100,8 +100,11 @@ public class RxTaskSchedulerImpl implements 
RxTaskScheduler {
         public Thread newThread( final Runnable r ) {
             final long newValue = threadCounter.incrementAndGet();
 
-            Thread t = new Thread( r, poolName + "-" + newValue );
+            final String threadName = poolName + "-" + newValue;
 
+            Thread t = new Thread( r, threadName  );
+
+            //set it to be a daemon thread so it doesn't block shutdown
             t.setDaemon( true );
 
             return t;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Entity.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Entity.java
 
b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Entity.java
index 3296203..ae8662f 100644
--- 
a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Entity.java
+++ 
b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Entity.java
@@ -69,7 +69,7 @@ public class Entity extends EntityObject {
         this.id = id;
     }
 
-    protected Entity(Id id, UUID version){
+    public Entity(Id id, UUID version){
         this(id);
         this.version = version;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
index a2a1c4d..5852394 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
@@ -41,7 +41,6 @@ import static 
org.apache.usergrid.persistence.index.impl.IndexingUtils.idString;
 /**
  * Represent the properties required to build an index request
  */
-@JsonTypeInfo( use = JsonTypeInfo.Id.CLASS, include = 
JsonTypeInfo.As.PROPERTY, property = "@class" )
 public class IndexRequest implements BatchRequest {
 
     public String writeAlias;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/test-utils/src/main/java/org/apache/usergrid/cassandra/SpringResource.java
----------------------------------------------------------------------
diff --git 
a/stack/test-utils/src/main/java/org/apache/usergrid/cassandra/SpringResource.java
 
b/stack/test-utils/src/main/java/org/apache/usergrid/cassandra/SpringResource.java
index 2370c4b..7d3782d 100644
--- 
a/stack/test-utils/src/main/java/org/apache/usergrid/cassandra/SpringResource.java
+++ 
b/stack/test-utils/src/main/java/org/apache/usergrid/cassandra/SpringResource.java
@@ -17,12 +17,9 @@
 package org.apache.usergrid.cassandra;
 
 
-import org.junit.rules.TestRule;
-import org.junit.runner.Description;
-import org.junit.runners.model.Statement;
-import org.safehaus.guicyfig.EnvironResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationContext;
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
 
@@ -93,6 +90,14 @@ public class SpringResource {
 
 
     /**
+     * Use this with care.  You should use getBean in most situations
+     * @return
+     */
+    public ApplicationContext getAppContext(){
+        return applicationContext;
+    }
+
+    /**
      * Gets a bean from the application context.
      *
      * @param requiredType the type of the bean

Reply via email to