Repository: incubator-usergrid Updated Branches: refs/heads/two-dot-o-dev 234f873d0 -> bd3d93959
future -> obs Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bd3d9395 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bd3d9395 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bd3d9395 Branch: refs/heads/two-dot-o-dev Commit: bd3d93959b86def18f3fe41c23194543fc1fcb17 Parents: 234f873 Author: Shawn Feldman <sfeld...@apache.org> Authored: Mon Apr 27 09:06:32 2015 -0600 Committer: Shawn Feldman <sfeld...@apache.org> Committed: Mon Apr 27 09:06:32 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/index/IndexServiceImpl.java | 2 +- .../persistence/core/future/BetterFuture.java | 68 -------------------- .../core/future/FutureObservable.java | 53 +++++++++++++++ .../persistence/index/EntityIndexBatch.java | 5 +- .../index/impl/EsEntityIndexBatchImpl.java | 7 +- .../index/impl/EsEntityIndexImpl.java | 6 +- .../index/impl/EsIndexBufferConsumerImpl.java | 8 +-- .../index/impl/IndexBufferConsumer.java | 4 +- .../index/impl/IndexOperationMessage.java | 19 ++---- .../persistence/index/impl/EntityIndexTest.java | 36 +++++------ .../persistence/index/impl/GeoPagingTest.java | 2 +- 11 files changed, 96 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java index 81bf6cb..6cfeefb 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@ -120,7 +120,7 @@ public class IndexServiceImpl implements IndexService { batch.index( indexEdge, entity ); } ) //return the future from the batch execution - .flatMap( batch -> Observable.from( batch.execute() ) ) ); + .flatMap( batch -> batch.execute() ) ); return ObservableTimer.time( batches, indexTimer ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java deleted file mode 100644 index 777ac52..0000000 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.usergrid.persistence.core.future; - - -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.FutureTask; - - -/** - * Future without the exception nastiness - */ -public class BetterFuture<T> extends FutureTask<T> { - - private Throwable error; - - - public BetterFuture( Callable<T> callable ) { - super( callable ); - } - - - public void setError( final Throwable t ) { - this.error = t; - } - - - public void done() { - run(); - } - - - public T get() { - - T returnValue = null; - - try { - returnValue = super.get(); - } - catch ( InterruptedException e ) { - //swallow - } - catch ( ExecutionException e ) { - //swallow - } - - if ( error != null ) { - throw new RuntimeException( "Error in getting future", error ); - } - - return returnValue; - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java new file mode 100644 index 0000000..ea85f7c --- /dev/null +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.usergrid.persistence.core.future; + + +import rx.Observable; + +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; + + +/** + * Future without the exception nastiness + */ +public class FutureObservable<T> { + + private final T returnVal; + private Throwable error; + private final FutureTask<T> future; + + + public FutureObservable(final T returnVal) { + this.returnVal = returnVal; + future = new FutureTask<T>( () -> returnVal ); + } + + public void setError( final Throwable t ) { + this.error = t; + } + + public void done() { + future.run(); + } + + public Observable<T> observable() { + return !future.isDone() ? Observable.from(future) : Observable.just(returnVal); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java index 39c686c..85b234a 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java @@ -20,10 +20,11 @@ package org.apache.usergrid.persistence.index;/* import java.util.UUID; -import org.apache.usergrid.persistence.core.future.BetterFuture; +import org.apache.usergrid.persistence.core.future.FutureObservable; import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; +import rx.Observable; public interface EntityIndexBatch { @@ -66,7 +67,7 @@ public interface EntityIndexBatch { * Execute the batch * @return future to guarantee execution */ - BetterFuture<IndexOperationMessage> execute(); + Observable<IndexOperationMessage> execute(); /** * Get the number of operations in the batch http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java index 66849ee..77a7c14 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java @@ -24,7 +24,7 @@ import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.usergrid.persistence.core.future.BetterFuture; +import org.apache.usergrid.persistence.core.future.FutureObservable; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.util.ValidationUtils; import org.apache.usergrid.persistence.index.AliasedEntityIndex; @@ -35,6 +35,7 @@ import org.apache.usergrid.persistence.index.SearchEdge; import org.apache.usergrid.persistence.index.utils.IndexValidationUtils; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; +import rx.Observable; public class EsEntityIndexBatchImpl implements EntityIndexBatch { @@ -125,7 +126,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { @Override - public BetterFuture execute() { + public Observable execute() { IndexOperationMessage tempContainer = container; container = new IndexOperationMessage(); @@ -134,7 +135,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { */ if ( tempContainer.isEmpty() ) { tempContainer.done(); - return tempContainer.getFuture(); + return tempContainer.observable(); } return indexBatchBufferProducer.put( tempContainer ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java index c2bb441..fba6976 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java @@ -26,7 +26,7 @@ import com.google.common.io.Resources; import com.google.inject.Inject; import com.google.inject.Singleton; -import org.apache.usergrid.persistence.core.future.BetterFuture; +import org.apache.usergrid.persistence.core.future.FutureObservable; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.migration.data.VersionedData; import org.apache.usergrid.persistence.core.util.Health; @@ -323,8 +323,8 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData { public Observable<IndexRefreshCommand.IndexRefreshCommandInfo> refreshAsync() { refreshIndexMeter.mark(); - BetterFuture future = producer.put(new IndexOperationMessage()); - future.get(); + Observable future = producer.put(new IndexOperationMessage()); + future.toBlocking().last(); return indexRefreshCommand.execute(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java index 3f41eb3..b2433bd 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java @@ -33,7 +33,7 @@ import org.elasticsearch.client.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.usergrid.persistence.core.future.BetterFuture; +import org.apache.usergrid.persistence.core.future.FutureObservable; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.index.IndexFig; @@ -108,14 +108,14 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { } - public BetterFuture put( IndexOperationMessage message ) { - Preconditions.checkNotNull( message, "Message cannot be null" ); + public Observable put( IndexOperationMessage message ) { + Preconditions.checkNotNull(message, "Message cannot be null"); indexSizeCounter.inc( message.getDeIndexRequests().size() ); indexSizeCounter.inc( message.getIndexRequests().size() ); Timer.Context time = offerTimer.time(); bufferProducer.send( message ); time.stop(); - return message.getFuture(); + return message.observable(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java index 3258444..6898f15 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java @@ -20,7 +20,7 @@ package org.apache.usergrid.persistence.index.impl; -import org.apache.usergrid.persistence.core.future.BetterFuture; +import rx.Observable; /** @@ -34,5 +34,5 @@ public interface IndexBufferConsumer { * @param message * @return */ - BetterFuture put(IndexOperationMessage message); + Observable put(IndexOperationMessage message); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java index e830476..aaad0eb 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java @@ -23,11 +23,11 @@ package org.apache.usergrid.persistence.index.impl; import java.io.Serializable; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.Callable; -import org.apache.usergrid.persistence.core.future.BetterFuture; +import org.apache.usergrid.persistence.core.future.FutureObservable; import com.fasterxml.jackson.annotation.JsonIgnore; +import rx.Observable; /** @@ -39,14 +39,13 @@ public class IndexOperationMessage implements Serializable { - private final BetterFuture<IndexOperationMessage> containerFuture; + private final FutureObservable<IndexOperationMessage> containerFuture; public IndexOperationMessage() { - final IndexOperationMessage parent = this; this.indexRequests = new HashSet<>(); this.deIndexRequests = new HashSet<>(); - this.containerFuture = new BetterFuture<>( () -> parent ); + this.containerFuture = new FutureObservable<>( this ); } @@ -81,8 +80,8 @@ public class IndexOperationMessage implements Serializable { * return the promise */ @JsonIgnore - public BetterFuture<IndexOperationMessage> getFuture() { - return containerFuture; + public Observable<IndexOperationMessage> observable() { + return containerFuture.observable(); } @@ -117,10 +116,6 @@ public class IndexOperationMessage implements Serializable { public void done() { //if this has been serialized, it could be null. don't NPE if it is, there's nothing to ack - final BetterFuture<IndexOperationMessage> future = getFuture(); - - if(future != null ){ - future.done(); - } + containerFuture.done(); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java index 550b26b..3f23c11 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java @@ -137,7 +137,7 @@ public class EntityIndexTest extends BaseIT { batch.index( indexEdge, entity1 ); - batch.execute().get(); + batch.execute().toBlocking().last(); Entity entity2 = new Entity( entityType ); @@ -151,7 +151,7 @@ public class EntityIndexTest extends BaseIT { batch.index( indexEdge, entity2 ); - batch.execute().get(); + batch.execute().toBlocking().last(); ei.refreshAsync().toBlocking().last(); @@ -209,7 +209,7 @@ public class EntityIndexTest extends BaseIT { EntityIndexBatch batch = entityIndex.createBatch(); insertJsonBlob( sampleJson, batch, entityType, indexEdge, size, 0 ); - batch.execute().get(); + batch.execute().toBlocking().last(); } catch ( Exception e ) { synchronized ( failTime ) { @@ -307,7 +307,7 @@ public class EntityIndexTest extends BaseIT { EntityIndexBatch entityIndexBatch = entityIndex.createBatch(); entityIndexBatch.deindex( searchEdge, crs.get( 0 ) ); entityIndexBatch.deindex( searchEdge, crs.get( 1 ) ); - entityIndexBatch.execute().get(); + entityIndexBatch.execute().toBlocking().last(); ei.refreshAsync().toBlocking().last(); //Hilda Youn @@ -322,7 +322,7 @@ public class EntityIndexTest extends BaseIT { List<Object> sampleJson = mapper.readValue( is, new TypeReference<List<Object>>() {} ); EntityIndexBatch batch = entityIndex.createBatch(); insertJsonBlob( sampleJson, batch, entityType, indexEdge, max, startIndex ); - batch.execute().get(); + batch.execute().toBlocking().last(); IndexRefreshCommandImpl.IndexRefreshCommandInfo info = ei.refreshAsync().toBlocking().last(); long time = info.getExecutionTime(); log.info("refresh took ms:"+time); @@ -387,7 +387,7 @@ public class EntityIndexTest extends BaseIT { EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() ); entity.setField( new UUIDField( IndexingUtils.ENTITY_ID_FIELDNAME, UUID.randomUUID() ) ); - entityIndex.createBatch().index( searchEdge, entity ).execute().get(); + entityIndex.createBatch().index( searchEdge, entity ).execute().toBlocking().last(); ei.refreshAsync().toBlocking().last(); CandidateResults candidateResults = entityIndex @@ -395,8 +395,8 @@ public class EntityIndexTest extends BaseIT { assertEquals( 1, candidateResults.size() ); EntityIndexBatch batch = entityIndex.createBatch(); - batch.deindex( searchEdge, entity ).execute().get(); - batch.execute().get(); + batch.deindex( searchEdge, entity ).execute().toBlocking().last(); + batch.execute().toBlocking().last(); ei.refreshAsync().toBlocking().last(); candidateResults = entityIndex @@ -525,7 +525,7 @@ public class EntityIndexTest extends BaseIT { EntityIndexBatch batch = entityIndex.createBatch(); batch.index( indexSCope, user ); - batch.execute().get(); + batch.execute().toBlocking().last(); ei.refreshAsync().toBlocking().last(); final String query = "where username = 'edanuff'"; @@ -534,7 +534,7 @@ public class EntityIndexTest extends BaseIT { assertEquals( user.getId(), r.get( 0 ).getId() ); batch.deindex( indexSCope, user.getId(), user.getVersion() ); - batch.execute().get(); + batch.execute().toBlocking().last(); ei.refreshAsync().toBlocking().last(); // EntityRef @@ -595,7 +595,7 @@ public class EntityIndexTest extends BaseIT { EntityUtils.setVersion( fred, UUIDGenerator.newTimeUUID() ); batch.index( indexScope, fred ); - batch.execute().get(); + batch.execute().toBlocking().last(); ei.refreshAsync().toBlocking().last(); final SearchTypes searchTypes = SearchTypes.fromTypes( "user" ); @@ -676,7 +676,7 @@ public class EntityIndexTest extends BaseIT { } - batch.execute().get(); + batch.execute().toBlocking().last(); ei.refreshAsync().toBlocking().last(); @@ -743,7 +743,7 @@ public class EntityIndexTest extends BaseIT { EntityIndexBatch batch = entityIndex.createBatch(); batch.index( indexSCope, user ); - batch.execute().get(); + batch.execute().toBlocking().last(); ei.refreshAsync().toBlocking().last(); final String query = "where searchUUID = " + searchUUID; @@ -782,7 +782,7 @@ public class EntityIndexTest extends BaseIT { EntityIndexBatch batch = entityIndex.createBatch(); batch.index( indexSCope, user ); - batch.execute().get(); + batch.execute().toBlocking().last(); ei.refreshAsync().toBlocking().last(); final String query = "where string = 'I am*'"; @@ -839,7 +839,7 @@ public class EntityIndexTest extends BaseIT { EntityIndexBatch batch = entityIndex.createBatch(); batch.index( indexSCope, first ); batch.index( indexSCope, second ); - batch.execute().get(); + batch.execute().toBlocking().last(); ei.refreshAsync().toBlocking().last(); @@ -904,7 +904,7 @@ public class EntityIndexTest extends BaseIT { batch.index( indexScope2, second ); - batch.execute().get(); + batch.execute().toBlocking().last(); ei.refreshAsync().toBlocking().last(); @@ -985,7 +985,7 @@ public class EntityIndexTest extends BaseIT { batch.index( indexScope2, second ); - batch.execute().get(); + batch.execute().toBlocking().last(); ei.refreshAsync().toBlocking().last(); @@ -1095,7 +1095,7 @@ public class EntityIndexTest extends BaseIT { batch.index( indexScope2, second ); - batch.execute().get(); + batch.execute().toBlocking().last(); ei.refreshAsync().toBlocking().last(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java index 1b4b4a5..25bc48c 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java @@ -134,7 +134,7 @@ public class GeoPagingTest extends BaseIT { } - batch.execute().get(); + batch.execute().toBlocking().last(); ei.refreshAsync().toBlocking().last();