Repository: incubator-usergrid Updated Branches: refs/heads/two-dot-o-dev f2aa40325 -> 28a8590cd
Fixes index runtime execution for in memory to test Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5f99ee2d Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5f99ee2d Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5f99ee2d Branch: refs/heads/two-dot-o-dev Commit: 5f99ee2de640093335a73ffcfce3e2b07e8e146c Parents: 3480a36 Author: Todd Nine <tn...@apigee.com> Authored: Tue Apr 21 14:12:34 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Tue Apr 21 14:12:34 2015 -0600 ---------------------------------------------------------------------- .../index/AsyncIndexProvider.java | 5 +- .../index/InMemoryAsyncReIndexService.java | 43 +++++-- .../corepersistence/index/IndexService.java | 12 +- .../corepersistence/index/IndexServiceImpl.java | 55 ++++++--- .../corepersistence/TestIndexModule.java | 14 ++- .../corepersistence/index/IndexServiceTest.java | 112 +++++++++++++++++++ .../corepersistence/index/PublishRxtest.java | 39 ++++++- .../usergrid/persistence/CollectionIT.java | 1 - .../core/metrics/ObservableTimer.java | 72 ++++++++++++ .../core/rx/RxTaskSchedulerImpl.java | 5 +- .../persistence/model/entity/Entity.java | 2 +- .../persistence/index/impl/IndexRequest.java | 1 - .../usergrid/cassandra/SpringResource.java | 13 ++- 13 files changed, 321 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java index 2c48c13..18df824 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java @@ -43,7 +43,6 @@ public class AsyncIndexProvider implements Provider<AsyncReIndexService> { private final MetricsFactory metricsFactory; private final IndexService indexService; private final RxTaskScheduler rxTaskScheduler; - private final AllEntityIdsObservable allEntitiesObservable; private final EntityCollectionManagerFactory entityCollectionManagerFactory; private AsyncReIndexService asyncIndexService; @@ -53,14 +52,12 @@ public class AsyncIndexProvider implements Provider<AsyncReIndexService> { public AsyncIndexProvider( final QueryFig queryFig, final QueueManagerFactory queueManagerFactory, final MetricsFactory metricsFactory, final IndexService indexService, final RxTaskScheduler rxTaskScheduler, - final AllEntityIdsObservable allEntitiesObservable, final EntityCollectionManagerFactory entityCollectionManagerFactory ) { this.queryFig = queryFig; this.queueManagerFactory = queueManagerFactory; this.metricsFactory = metricsFactory; this.indexService = indexService; this.rxTaskScheduler = rxTaskScheduler; - this.allEntitiesObservable = allEntitiesObservable; this.entityCollectionManagerFactory = entityCollectionManagerFactory; } @@ -85,7 +82,7 @@ public class AsyncIndexProvider implements Provider<AsyncReIndexService> { switch ( impl ) { case LOCAL: return new InMemoryAsyncReIndexService( indexService, rxTaskScheduler, - entityCollectionManagerFactory ); + entityCollectionManagerFactory, metricsFactory ); case SQS: return new SQSAsyncReIndexService( queueManagerFactory, queryFig, metricsFactory ); default: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java index 5ebda87..4908945 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java @@ -25,11 +25,14 @@ import org.slf4j.LoggerFactory; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; +import com.codahale.metrics.Timer; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -39,33 +42,49 @@ import rx.Observable; @Singleton public class InMemoryAsyncReIndexService implements AsyncReIndexService { - private static final Logger log = LoggerFactory.getLogger(InMemoryAsyncReIndexService.class); + private static final Logger log = LoggerFactory.getLogger( InMemoryAsyncReIndexService.class ); private final IndexService indexService; private final RxTaskScheduler rxTaskScheduler; private final EntityCollectionManagerFactory entityCollectionManagerFactory; + private final Timer timer; @Inject public InMemoryAsyncReIndexService( final IndexService indexService, final RxTaskScheduler rxTaskScheduler, - final EntityCollectionManagerFactory entityCollectionManagerFactory ) { + final EntityCollectionManagerFactory entityCollectionManagerFactory, final + MetricsFactory metricsFactory ) { this.indexService = indexService; this.rxTaskScheduler = rxTaskScheduler; this.entityCollectionManagerFactory = entityCollectionManagerFactory; + + timer = metricsFactory.getTimer( InMemoryAsyncReIndexService.class, "IndexTimer" ); } @Override - public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity toIndex ) { + public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity ) { //process the entity immediately //only process the same version, otherwise ignore - Observable.just( toIndex ).doOnNext( entity -> { - log.debug( "Indexing entity {} in app scope {} ", entity, applicationScope ); - indexService.indexEntity( applicationScope, entity ); - } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); - } + log.debug( "Indexing entity {} in app scope {} ", entity, applicationScope ); + + final Observable<IndexOperationMessage> edgeObservable = indexService.indexEntity( applicationScope, entity ); + + + + edgeObservable.subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); + + //now start it +// final Timer.Context time = timer.time(); +// +// edgeObservable.connect(); +// +// time.stop(); + + + } @Override @@ -75,12 +94,12 @@ public class InMemoryAsyncReIndexService implements AsyncReIndexService { final Id entityId = entityIdScope.getId(); - final Entity - entity = entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( - entityId ).toBlocking().lastOrDefault( null ); + final Entity entity = + entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId ).toBlocking() + .lastOrDefault( null ); - if(entity == null){ + if ( entity == null ) { log.warn( "Could not find entity with id {} in app scope {} ", entityId, applicationScope ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java index 41fe323..5e9392b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java @@ -20,13 +20,13 @@ package org.apache.usergrid.corepersistence.index; -import java.util.UUID; - import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.IndexEdge; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; import rx.Observable; +import rx.observables.ConnectableObservable; /** @@ -41,11 +41,11 @@ public interface IndexService { * @param applicationScope The scope of the entity * @param entity The entity * - * @return An observable with the count of every batch executed to index the entity. Note that this a cold observable - * and must be subscribed to in order to perform the operation. This also makes no assumptions on scheduling. It is up to the caller + * @return A ConnectableObservable with every edge in the batch to index the entity. Note that this a cold observable + * and must be subscribed to, then "connect" in order to perform the operation. This also makes no assumptions on scheduling. It is up to the caller * to assign the correct scheduler to the observable */ - Observable<Long> indexEntity( final ApplicationScope applicationScope, final Entity entity ); + Observable<IndexOperationMessage> indexEntity( final ApplicationScope applicationScope, final Entity entity ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java index 29dd04a..87e22f4 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@ -20,8 +20,11 @@ package org.apache.usergrid.corepersistence.index; -import java.util.Collection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.metrics.ObservableTimer; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.entities.Application; import org.apache.usergrid.persistence.graph.Edge; @@ -38,6 +41,7 @@ import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.schema.CollectionInfo; import org.apache.usergrid.utils.InflectionUtils; +import com.codahale.metrics.Timer; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -55,26 +59,30 @@ import static org.apache.usergrid.persistence.Schema.getDefaultSchema; @Singleton public class IndexServiceImpl implements IndexService { + + private static final Logger logger = LoggerFactory.getLogger( IndexServiceImpl.class ); + private final GraphManagerFactory graphManagerFactory; private final EntityIndexFactory entityIndexFactory; private final EdgesObservable edgesObservable; private final IndexFig indexFig; + private final Timer indexTimer; @Inject public IndexServiceImpl( final GraphManagerFactory graphManagerFactory, final EntityIndexFactory entityIndexFactory, - final EdgesObservable edgesObservable, IndexFig indexFig ) { + final EdgesObservable edgesObservable, final IndexFig indexFig, final MetricsFactory metricsFactory ) { this.graphManagerFactory = graphManagerFactory; this.entityIndexFactory = entityIndexFactory; this.edgesObservable = edgesObservable; this.indexFig = indexFig; + this.indexTimer = metricsFactory.getTimer( IndexServiceImpl.class, "index.process"); } @Override - public Observable<Long> indexEntity( final ApplicationScope applicationScope, final Entity entity ) { - - + public Observable<IndexOperationMessage> indexEntity( final ApplicationScope applicationScope, + final Entity entity ) { //bootstrap the lower modules from their caches final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); final ApplicationEntityIndex ei = entityIndexFactory.createApplicationEntityIndex( applicationScope ); @@ -91,30 +99,37 @@ public class IndexServiceImpl implements IndexService { //we might or might not need to index from target-> source + final Observable<IndexEdge> targetSizes = getIndexEdgesToTarget( gm, entityId ); - final Observable<IndexEdge> targetSizes = getIndexEdgesToTarget( gm, entityId ); + //merge the edges together + final Observable<IndexEdge> observable = Observable.merge( sourceEdgesToIndex, targetSizes); + //do our observable for batching + //try to send a whole batch if we can - //start the observable via publish - final ConnectableObservable<IndexOperationMessage> observable = - //try to send a whole batch if we can - Observable.merge( sourceEdgesToIndex, targetSizes ).buffer( indexFig.getIndexBatchSize() ) + //do our observable for batching + //try to send a whole batch if we can + final Observable<IndexOperationMessage> batches = observable.buffer( indexFig.getIndexBatchSize() ) - //map into batches based on our buffer size - .flatMap( buffer -> Observable.from( buffer ).collect( () -> ei.createBatch(), - ( batch, indexEdge ) -> batch.index( indexEdge, entity ) ) + //map into batches based on our buffer size + .flatMap( buffer -> Observable.from( buffer ) + //collect results into a single batch + .collect( () -> ei.createBatch(), ( batch, indexEdge ) -> { + logger.debug( "adding edge {} to batch for entity {}", indexEdge, entity ); + batch.index( indexEdge, entity ); + } ) //return the future from the batch execution - .flatMap( batch -> Observable.from( batch.execute() ) ) ).publish(); + .flatMap( batch -> Observable.from( batch.execute() ) ) ); + return ObservableTimer.time( batches, indexTimer ); + } - return observable.countLong(); - } /** - * Get index edgs to the target + * Get index edges to the target * * @param graphManager The graph manager * @param entityId The entitie's id @@ -147,6 +162,10 @@ public class IndexServiceImpl implements IndexService { * * we're indexing from target->source here */ - return edgesObservable.getEdgesFromSource( graphManager, entityId, linkedCollection ).map( edge -> generateScopeToTarget( edge ) ); + return edgesObservable.getEdgesFromSource( graphManager, entityId, linkedCollection ) + .map( edge -> generateScopeToTarget( edge ) ); } + + + } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java index e5254d1..95000bf 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java @@ -19,17 +19,25 @@ package org.apache.usergrid.corepersistence; +import org.springframework.context.ApplicationContext; + +import org.apache.usergrid.persistence.PersistenceModule; import org.apache.usergrid.persistence.core.guice.TestModule; +import org.apache.usergrid.setup.ConcurrentProcessSingleton; public class TestIndexModule extends TestModule { + @Override protected void configure() { - //this will break, we need to untagle this and move to guice in core completely - install( new CoreModule( )); - } + //TODO, refactor to guice to get rid of this + final ApplicationContext singleton = ConcurrentProcessSingleton.getInstance().getSpringResource().getAppContext(); + //this will break, we need to untagle this and move to guice in core completely + install( new CoreModule() ); + install( new PersistenceModule( singleton ) ); + } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java new file mode 100644 index 0000000..b0f3995 --- /dev/null +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import java.util.List; +import java.util.Set; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.apache.usergrid.corepersistence.TestIndexModule; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.test.UseModules; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.index.IndexEdge; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.index.impl.EsRunner; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; +import org.apache.usergrid.persistence.index.impl.IndexRequest; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.field.StringField; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; + +import com.google.inject.Inject; + +import rx.Observable; + +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionEdge; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicationScope; +import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; +import static org.junit.Assert.*; + + +@RunWith(EsRunner.class) +@UseModules({ TestIndexModule.class }) +public class IndexServiceTest { + + @Inject + public IndexService indexService; + + + @Inject + public GraphManagerFactory graphManagerFactory; + + public GraphManager graphManager; + + public ApplicationScope applicationScope; + + @Before + public void setup(){ + applicationScope = getApplicationScope( UUIDGenerator.newTimeUUID()); + + graphManager = graphManagerFactory.createEdgeManager( applicationScope ); + } + + + @Test + public void testSingleIndexFromSource(){ + final Entity entity = new Entity( createId( "test" ), UUIDGenerator.newTimeUUID()); + entity.setField( new StringField( "string", "foo" ) ); + + final Edge collectionEdge = createCollectionEdge( applicationScope.getApplication(), "tests", entity.getId() ); + + //write the edge + graphManager.writeEdge( collectionEdge ).toBlocking().last(); + + + //index the edge + final Observable<IndexOperationMessage> indexed = indexService.indexEntity( applicationScope, entity ); + + + //real users should never call to blocking, we're not sure what we'll get + final IndexOperationMessage results = indexed.toBlocking().last(); + + final Set<IndexRequest> indexRequests = results.getIndexRequests(); + + //ensure our value made it to the index request + final IndexRequest indexRequest = indexRequests.iterator().next(); + + assertNotNull(indexRequest); + + + +// assertEquals(applicationScope.getApplication(), indexRequest.); +// assertEquals(collectionEdge.getTimestamp(), edge.getTimestamp()); +// assertEquals(collectionEdge.getType(), edge.getEdgeName()); +// assertEquals( SearchEdge.NodeType.TARGET, edge.getNodeType()); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxtest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxtest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxtest.java index 1eb00c2..84d3ef7 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxtest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxtest.java @@ -23,6 +23,7 @@ package org.apache.usergrid.corepersistence.index; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.junit.Ignore; import org.junit.Test; import rx.Observable; @@ -30,6 +31,7 @@ import rx.Subscription; import rx.observables.ConnectableObservable; import rx.schedulers.Schedulers; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -54,8 +56,41 @@ public class PublishRxTest { assertTrue( "publish1 behaves as expected", completed ); - final boolean completedSubscription = connectedObservable.isUnsubscribed();; + final boolean completedSubscription = connectedObservable.isUnsubscribed(); + ; - assertTrue("Subscription complete", completedSubscription); + assertTrue( "Subscription complete", completedSubscription ); + } + + + @Test + @Ignore("This seems like it should work, yet blocks forever") + public void testConnectableObserver() throws InterruptedException { + + final int count = 10; + + final CountDownLatch latch = new CountDownLatch( count ); + + final ConnectableObservable<Integer> connectedObservable = Observable.range( 0, count ).publish(); + + + //connect to our latch, which should run on it's own subscription + //start our latch running + connectedObservable.doOnNext( integer -> latch.countDown() ).subscribeOn( Schedulers.io() ).subscribe(); + + + final Observable<Integer> countObservable = connectedObservable.subscribeOn( Schedulers.io() ).count(); + + //start the sequence + connectedObservable.connect(); + + + final boolean completed = latch.await( 5, TimeUnit.SECONDS ); + + assertTrue( "publish1 behaves as expected", completed ); + + final int returnedCount = countObservable.toBlocking().last(); + + assertEquals( "Counts the same", count, returnedCount ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java index d1f746a..fcdc11f 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java @@ -314,7 +314,6 @@ public class CollectionIT extends AbstractCoreIT { assertNotNull( user ); app.refreshIndex(); - Thread.sleep( 100 ); // EntityRef final Query query = Query.fromQL( "lastname = '" + lastName + "'" ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/ObservableTimer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/ObservableTimer.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/ObservableTimer.java new file mode 100644 index 0000000..c8ffeb8 --- /dev/null +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/ObservableTimer.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.core.metrics; + + +import com.codahale.metrics.Timer; + +import rx.Observable; + + +/** + * A wrapper class that will allows timing around an observable. Simply pass an observable and we will set a start and + * stop issue. + */ +public class ObservableTimer { + + + private final Timer timer; + private Timer.Context context; + + + /** + * Intentionally private, use the factory + * + * @param timer + */ + private ObservableTimer( final Timer timer ) {this.timer = timer;} + + + /** + * Start the timer + */ + public void start() { + context = timer.time(); + } + + + /** + * Stop the timer + */ + public void stop() { + context.stop(); + } + + + /** + * Time the obserable with the specified timer + */ + public static <T> Observable<T> time( final Observable<T> observable, final Timer timer ) { + final ObservableTimer proxy = new ObservableTimer( timer ); + + //attach to the observable + return observable.doOnSubscribe( () -> proxy.start() ).doOnCompleted( () -> proxy.stop() ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java index 311f7f4..f2950f4 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java @@ -100,8 +100,11 @@ public class RxTaskSchedulerImpl implements RxTaskScheduler { public Thread newThread( final Runnable r ) { final long newValue = threadCounter.incrementAndGet(); - Thread t = new Thread( r, poolName + "-" + newValue ); + final String threadName = poolName + "-" + newValue; + Thread t = new Thread( r, threadName ); + + //set it to be a daemon thread so it doesn't block shutdown t.setDaemon( true ); return t; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Entity.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Entity.java b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Entity.java index 3296203..ae8662f 100644 --- a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Entity.java +++ b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Entity.java @@ -69,7 +69,7 @@ public class Entity extends EntityObject { this.id = id; } - protected Entity(Id id, UUID version){ + public Entity(Id id, UUID version){ this(id); this.version = version; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java index a2a1c4d..5852394 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java @@ -41,7 +41,6 @@ import static org.apache.usergrid.persistence.index.impl.IndexingUtils.idString; /** * Represent the properties required to build an index request */ -@JsonTypeInfo( use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class" ) public class IndexRequest implements BatchRequest { public String writeAlias; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5f99ee2d/stack/test-utils/src/main/java/org/apache/usergrid/cassandra/SpringResource.java ---------------------------------------------------------------------- diff --git a/stack/test-utils/src/main/java/org/apache/usergrid/cassandra/SpringResource.java b/stack/test-utils/src/main/java/org/apache/usergrid/cassandra/SpringResource.java index 2370c4b..7d3782d 100644 --- a/stack/test-utils/src/main/java/org/apache/usergrid/cassandra/SpringResource.java +++ b/stack/test-utils/src/main/java/org/apache/usergrid/cassandra/SpringResource.java @@ -17,12 +17,9 @@ package org.apache.usergrid.cassandra; -import org.junit.rules.TestRule; -import org.junit.runner.Description; -import org.junit.runners.model.Statement; -import org.safehaus.guicyfig.EnvironResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationContext; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; @@ -93,6 +90,14 @@ public class SpringResource { /** + * Use this with care. You should use getBean in most situations + * @return + */ + public ApplicationContext getAppContext(){ + return applicationContext; + } + + /** * Gets a bean from the application context. * * @param requiredType the type of the bean