Repository: incubator-usergrid Updated Branches: refs/heads/two-dot-o-dev ff2b3f142 -> 07e2a4145
synchronous tests Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ec89708d Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ec89708d Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ec89708d Branch: refs/heads/two-dot-o-dev Commit: ec89708d83eeea4a75c558cbeb378d5e5e8466f1 Parents: bd3d939 Author: Shawn Feldman <sfeld...@apache.org> Authored: Mon Apr 27 13:19:21 2015 -0600 Committer: Shawn Feldman <sfeld...@apache.org> Committed: Mon Apr 27 13:19:21 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/index/AsyncIndexProvider.java | 3 ++- .../index/InMemoryAsyncIndexService.java | 13 ++++++++++--- .../corepersistence/index/IndexProcessorFig.java | 3 +++ .../index/InMemoryAsycIndexServiceTest.java | 2 +- .../src/test/resources/usergrid-custom-test.properties | 1 + .../persistence/core/future/FutureObservable.java | 7 ------- 6 files changed, 17 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ec89708d/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java index 0043166..c4f34de 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java @@ -81,7 +81,7 @@ public class AsyncIndexProvider implements Provider<AsyncIndexService> { switch ( impl ) { case LOCAL: return new InMemoryAsyncIndexService( indexService, rxTaskScheduler, - entityCollectionManagerFactory ); + entityCollectionManagerFactory, indexProcessorFig.resolveSynchronously()); case SQS: return new SQSAsyncIndexService( queueManagerFactory, indexProcessorFig, metricsFactory, indexService, entityCollectionManagerFactory, rxTaskScheduler ); @@ -108,6 +108,7 @@ public class AsyncIndexProvider implements Provider<AsyncIndexService> { * Different implementations */ public static enum Implementations { + TEST, LOCAL, SQS; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ec89708d/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java index e8d178c..3f59b0c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java @@ -35,6 +35,7 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import rx.Observable; +import rx.Observer; @Singleton @@ -45,19 +46,21 @@ public class InMemoryAsyncIndexService implements AsyncIndexService { private final IndexService indexService; private final RxTaskScheduler rxTaskScheduler; private final EntityCollectionManagerFactory entityCollectionManagerFactory; + private final boolean resolveSynchronously; @Inject public InMemoryAsyncIndexService( final IndexService indexService, final RxTaskScheduler rxTaskScheduler, - final EntityCollectionManagerFactory entityCollectionManagerFactory ) { + final EntityCollectionManagerFactory entityCollectionManagerFactory, boolean resolveSynchronously ) { this.indexService = indexService; this.rxTaskScheduler = rxTaskScheduler; this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.resolveSynchronously = resolveSynchronously; } @Override - public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity ) { + public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity) { //process the entity immediately //only process the same version, otherwise ignore @@ -68,7 +71,11 @@ public class InMemoryAsyncIndexService implements AsyncIndexService { final Observable<IndexOperationMessage> edgeObservable = indexService.indexEntity( applicationScope, entity ); //start it in the background on an i/o thread - edgeObservable.subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); + if(!resolveSynchronously){ + edgeObservable.subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ); + }else { + edgeObservable.toBlocking().last(); + } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ec89708d/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java index 1e8abff..fe9d3fd 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java @@ -83,4 +83,7 @@ public interface IndexProcessorFig extends GuicyFig { long getReIndexSampleInterval(); + @Default("false") + @Key("elasticsearch.queue_impl.resolution") + boolean resolveSynchronously(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ec89708d/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java index 43968de..e3c59c0 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java @@ -54,7 +54,7 @@ public class InMemoryAsycIndexServiceTest extends AsyncIndexServiceTest { @Override protected AsyncIndexService getAsyncIndexService() { - return new InMemoryAsyncIndexService( indexService, rxTaskScheduler, entityCollectionManagerFactory ); + return new InMemoryAsyncIndexService( indexService, rxTaskScheduler, entityCollectionManagerFactory,false ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ec89708d/stack/core/src/test/resources/usergrid-custom-test.properties ---------------------------------------------------------------------- diff --git a/stack/core/src/test/resources/usergrid-custom-test.properties b/stack/core/src/test/resources/usergrid-custom-test.properties index 703424f..a69e01f 100644 --- a/stack/core/src/test/resources/usergrid-custom-test.properties +++ b/stack/core/src/test/resources/usergrid-custom-test.properties @@ -23,5 +23,6 @@ cassandra.connections=1000 elasticsearch.number_shards=1 elasticsearch.number_replicas=0 elasticsearch.index_prefix=core_tests +elasticsearch.queue_impl.resolution=true http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ec89708d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java index ea85f7c..f86ffc1 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java @@ -19,8 +19,6 @@ package org.apache.usergrid.persistence.core.future; import rx.Observable; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; import java.util.concurrent.FutureTask; @@ -30,7 +28,6 @@ import java.util.concurrent.FutureTask; public class FutureObservable<T> { private final T returnVal; - private Throwable error; private final FutureTask<T> future; @@ -39,10 +36,6 @@ public class FutureObservable<T> { future = new FutureTask<T>( () -> returnVal ); } - public void setError( final Throwable t ) { - this.error = t; - } - public void done() { future.run(); }