Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev 234f873d0 -> bd3d93959


future -> obs


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bd3d9395
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bd3d9395
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bd3d9395

Branch: refs/heads/two-dot-o-dev
Commit: bd3d93959b86def18f3fe41c23194543fc1fcb17
Parents: 234f873
Author: Shawn Feldman <sfeld...@apache.org>
Authored: Mon Apr 27 09:06:32 2015 -0600
Committer: Shawn Feldman <sfeld...@apache.org>
Committed: Mon Apr 27 09:06:32 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/index/IndexServiceImpl.java |  2 +-
 .../persistence/core/future/BetterFuture.java   | 68 --------------------
 .../core/future/FutureObservable.java           | 53 +++++++++++++++
 .../persistence/index/EntityIndexBatch.java     |  5 +-
 .../index/impl/EsEntityIndexBatchImpl.java      |  7 +-
 .../index/impl/EsEntityIndexImpl.java           |  6 +-
 .../index/impl/EsIndexBufferConsumerImpl.java   |  8 +--
 .../index/impl/IndexBufferConsumer.java         |  4 +-
 .../index/impl/IndexOperationMessage.java       | 19 ++----
 .../persistence/index/impl/EntityIndexTest.java | 36 +++++------
 .../persistence/index/impl/GeoPagingTest.java   |  2 +-
 11 files changed, 96 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 81bf6cb..6cfeefb 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -120,7 +120,7 @@ public class IndexServiceImpl implements IndexService {
                     batch.index( indexEdge, entity );
                 } )
                     //return the future from the batch execution
-                .flatMap( batch -> Observable.from( batch.execute() ) ) );
+                .flatMap( batch -> batch.execute() ) );
 
         return ObservableTimer.time( batches, indexTimer );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
deleted file mode 100644
index 777ac52..0000000
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.persistence.core.future;
-
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.FutureTask;
-
-
-/**
- * Future without the exception nastiness
- */
-public class BetterFuture<T> extends FutureTask<T> {
-
-    private Throwable error;
-
-
-    public BetterFuture( Callable<T> callable ) {
-        super( callable );
-    }
-
-
-    public void setError( final Throwable t ) {
-        this.error = t;
-    }
-
-
-    public void done() {
-        run();
-    }
-
-
-    public T get() {
-
-        T returnValue = null;
-
-        try {
-            returnValue = super.get();
-        }
-        catch ( InterruptedException e ) {
-            //swallow
-        }
-        catch ( ExecutionException e ) {
-            //swallow
-        }
-
-        if ( error != null ) {
-           throw new RuntimeException( "Error in getting future", error );
-        }
-
-        return returnValue;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
new file mode 100644
index 0000000..ea85f7c
--- /dev/null
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.core.future;
+
+
+import rx.Observable;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+
+
+/**
+ * Future without the exception nastiness
+ */
+public class FutureObservable<T> {
+
+    private final T returnVal;
+    private Throwable error;
+    private final FutureTask<T> future;
+
+
+    public FutureObservable(final T returnVal) {
+        this.returnVal = returnVal;
+        future = new FutureTask<T>( () -> returnVal );
+    }
+
+    public void setError( final Throwable t ) {
+        this.error = t;
+    }
+
+    public void done() {
+        future.run();
+    }
+
+    public Observable<T> observable() {
+        return !future.isDone() ? Observable.from(future) : 
Observable.just(returnVal);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index 39c686c..85b234a 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -20,10 +20,11 @@ package org.apache.usergrid.persistence.index;/*
 
 import java.util.UUID;
 
-import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.core.future.FutureObservable;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
+import rx.Observable;
 
 
 public interface EntityIndexBatch {
@@ -66,7 +67,7 @@ public interface EntityIndexBatch {
      * Execute the batch
      * @return future to guarantee execution
      */
-    BetterFuture<IndexOperationMessage> execute();
+    Observable<IndexOperationMessage> execute();
 
     /**
      * Get the number of operations in the batch

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 66849ee..77a7c14 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -24,7 +24,7 @@ import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.core.future.FutureObservable;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.index.AliasedEntityIndex;
@@ -35,6 +35,7 @@ import org.apache.usergrid.persistence.index.SearchEdge;
 import org.apache.usergrid.persistence.index.utils.IndexValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
+import rx.Observable;
 
 
 public class EsEntityIndexBatchImpl implements EntityIndexBatch {
@@ -125,7 +126,7 @@ public class EsEntityIndexBatchImpl implements 
EntityIndexBatch {
 
 
     @Override
-    public BetterFuture execute() {
+    public Observable execute() {
         IndexOperationMessage tempContainer = container;
         container = new IndexOperationMessage();
 
@@ -134,7 +135,7 @@ public class EsEntityIndexBatchImpl implements 
EntityIndexBatch {
          */
         if ( tempContainer.isEmpty() ) {
             tempContainer.done();
-            return tempContainer.getFuture();
+            return tempContainer.observable();
         }
 
         return indexBatchBufferProducer.put( tempContainer );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index c2bb441..fba6976 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -26,7 +26,7 @@ import com.google.common.io.Resources;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
-import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.core.future.FutureObservable;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.migration.data.VersionedData;
 import org.apache.usergrid.persistence.core.util.Health;
@@ -323,8 +323,8 @@ public class EsEntityIndexImpl implements 
AliasedEntityIndex,VersionedData {
     public Observable<IndexRefreshCommand.IndexRefreshCommandInfo> 
refreshAsync() {
 
         refreshIndexMeter.mark();
-        BetterFuture future = producer.put(new IndexOperationMessage());
-        future.get();
+        Observable future = producer.put(new IndexOperationMessage());
+        future.toBlocking().last();
         return indexRefreshCommand.execute();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 3f41eb3..b2433bd 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -33,7 +33,7 @@ import org.elasticsearch.client.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.core.future.FutureObservable;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.IndexFig;
 
@@ -108,14 +108,14 @@ public class EsIndexBufferConsumerImpl implements 
IndexBufferConsumer {
     }
 
 
-    public BetterFuture put( IndexOperationMessage message ) {
-        Preconditions.checkNotNull( message, "Message cannot be null" );
+    public Observable put( IndexOperationMessage message ) {
+        Preconditions.checkNotNull(message, "Message cannot be null");
         indexSizeCounter.inc( message.getDeIndexRequests().size() );
         indexSizeCounter.inc( message.getIndexRequests().size() );
         Timer.Context time = offerTimer.time();
         bufferProducer.send( message );
         time.stop();
-        return message.getFuture();
+        return message.observable();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
index 3258444..6898f15 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
@@ -20,7 +20,7 @@
 package org.apache.usergrid.persistence.index.impl;
 
 
-import org.apache.usergrid.persistence.core.future.BetterFuture;
+import rx.Observable;
 
 
 /**
@@ -34,5 +34,5 @@ public interface IndexBufferConsumer {
      * @param message
      * @return
      */
-    BetterFuture put(IndexOperationMessage message);
+    Observable put(IndexOperationMessage message);
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
index e830476..aaad0eb 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
@@ -23,11 +23,11 @@ package org.apache.usergrid.persistence.index.impl;
 import java.io.Serializable;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.Callable;
 
-import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.core.future.FutureObservable;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import rx.Observable;
 
 
 /**
@@ -39,14 +39,13 @@ public class IndexOperationMessage implements Serializable {
 
 
 
-    private final BetterFuture<IndexOperationMessage> containerFuture;
+    private final FutureObservable<IndexOperationMessage> containerFuture;
 
 
     public IndexOperationMessage() {
-        final IndexOperationMessage parent = this;
         this.indexRequests = new HashSet<>();
         this.deIndexRequests = new HashSet<>();
-        this.containerFuture = new BetterFuture<>( () -> parent );
+        this.containerFuture = new FutureObservable<>( this );
     }
 
 
@@ -81,8 +80,8 @@ public class IndexOperationMessage implements Serializable {
      * return the promise
      */
     @JsonIgnore
-    public BetterFuture<IndexOperationMessage> getFuture() {
-        return containerFuture;
+    public Observable<IndexOperationMessage> observable() {
+        return containerFuture.observable();
     }
 
 
@@ -117,10 +116,6 @@ public class IndexOperationMessage implements Serializable 
{
 
     public void done() {
         //if this has been serialized, it could be null. don't NPE if it is, 
there's nothing to ack
-        final BetterFuture<IndexOperationMessage> future = getFuture();
-
-        if(future != null ){
-            future.done();
-        }
+        containerFuture.done();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
 
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 550b26b..3f23c11 100644
--- 
a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ 
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -137,7 +137,7 @@ public class EntityIndexTest extends BaseIT {
 
 
         batch.index( indexEdge, entity1 );
-        batch.execute().get();
+        batch.execute().toBlocking().last();
 
 
         Entity entity2 = new Entity( entityType );
@@ -151,7 +151,7 @@ public class EntityIndexTest extends BaseIT {
 
 
         batch.index( indexEdge, entity2 );
-        batch.execute().get();
+        batch.execute().toBlocking().last();
 
         ei.refreshAsync().toBlocking().last();
 
@@ -209,7 +209,7 @@ public class EntityIndexTest extends BaseIT {
 
                     EntityIndexBatch batch = entityIndex.createBatch();
                     insertJsonBlob( sampleJson, batch, entityType, indexEdge, 
size, 0 );
-                    batch.execute().get();
+                    batch.execute().toBlocking().last();
                 }
                 catch ( Exception e ) {
                     synchronized ( failTime ) {
@@ -307,7 +307,7 @@ public class EntityIndexTest extends BaseIT {
         EntityIndexBatch entityIndexBatch = entityIndex.createBatch();
         entityIndexBatch.deindex( searchEdge, crs.get( 0 ) );
         entityIndexBatch.deindex( searchEdge, crs.get( 1 ) );
-        entityIndexBatch.execute().get();
+        entityIndexBatch.execute().toBlocking().last();
         ei.refreshAsync().toBlocking().last();
 
         //Hilda Youn
@@ -322,7 +322,7 @@ public class EntityIndexTest extends BaseIT {
         List<Object> sampleJson = mapper.readValue( is, new 
TypeReference<List<Object>>() {} );
         EntityIndexBatch batch = entityIndex.createBatch();
         insertJsonBlob( sampleJson, batch, entityType, indexEdge, max, 
startIndex );
-        batch.execute().get();
+        batch.execute().toBlocking().last();
         IndexRefreshCommandImpl.IndexRefreshCommandInfo info =  
ei.refreshAsync().toBlocking().last();
         long time = info.getExecutionTime();
         log.info("refresh took ms:"+time);
@@ -387,7 +387,7 @@ public class EntityIndexTest extends BaseIT {
         EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
         entity.setField( new UUIDField( IndexingUtils.ENTITY_ID_FIELDNAME, 
UUID.randomUUID() ) );
 
-        entityIndex.createBatch().index( searchEdge, entity ).execute().get();
+        entityIndex.createBatch().index( searchEdge, entity 
).execute().toBlocking().last();
         ei.refreshAsync().toBlocking().last();
 
         CandidateResults candidateResults = entityIndex
@@ -395,8 +395,8 @@ public class EntityIndexTest extends BaseIT {
         assertEquals( 1, candidateResults.size() );
 
         EntityIndexBatch batch = entityIndex.createBatch();
-        batch.deindex( searchEdge, entity ).execute().get();
-        batch.execute().get();
+        batch.deindex( searchEdge, entity ).execute().toBlocking().last();
+        batch.execute().toBlocking().last();
         ei.refreshAsync().toBlocking().last();
 
         candidateResults = entityIndex
@@ -525,7 +525,7 @@ public class EntityIndexTest extends BaseIT {
         EntityIndexBatch batch = entityIndex.createBatch();
 
         batch.index( indexSCope, user );
-        batch.execute().get();
+        batch.execute().toBlocking().last();
         ei.refreshAsync().toBlocking().last();
 
         final String query = "where username = 'edanuff'";
@@ -534,7 +534,7 @@ public class EntityIndexTest extends BaseIT {
         assertEquals( user.getId(), r.get( 0 ).getId() );
 
         batch.deindex( indexSCope, user.getId(), user.getVersion() );
-        batch.execute().get();
+        batch.execute().toBlocking().last();
         ei.refreshAsync().toBlocking().last();
 
         // EntityRef
@@ -595,7 +595,7 @@ public class EntityIndexTest extends BaseIT {
         EntityUtils.setVersion( fred, UUIDGenerator.newTimeUUID() );
         batch.index( indexScope, fred );
 
-        batch.execute().get();
+        batch.execute().toBlocking().last();
         ei.refreshAsync().toBlocking().last();
 
         final SearchTypes searchTypes = SearchTypes.fromTypes( "user" );
@@ -676,7 +676,7 @@ public class EntityIndexTest extends BaseIT {
         }
 
 
-        batch.execute().get();
+        batch.execute().toBlocking().last();
 
         ei.refreshAsync().toBlocking().last();
 
@@ -743,7 +743,7 @@ public class EntityIndexTest extends BaseIT {
         EntityIndexBatch batch = entityIndex.createBatch();
 
         batch.index( indexSCope, user );
-        batch.execute().get();
+        batch.execute().toBlocking().last();
         ei.refreshAsync().toBlocking().last();
 
         final String query = "where searchUUID = " + searchUUID;
@@ -782,7 +782,7 @@ public class EntityIndexTest extends BaseIT {
         EntityIndexBatch batch = entityIndex.createBatch();
 
         batch.index( indexSCope, user );
-        batch.execute().get();
+        batch.execute().toBlocking().last();
         ei.refreshAsync().toBlocking().last();
 
         final String query = "where string = 'I am*'";
@@ -839,7 +839,7 @@ public class EntityIndexTest extends BaseIT {
         EntityIndexBatch batch = entityIndex.createBatch();
         batch.index( indexSCope, first );
         batch.index( indexSCope, second );
-        batch.execute().get();
+        batch.execute().toBlocking().last();
         ei.refreshAsync().toBlocking().last();
 
 
@@ -904,7 +904,7 @@ public class EntityIndexTest extends BaseIT {
         batch.index( indexScope2, second );
 
 
-        batch.execute().get();
+        batch.execute().toBlocking().last();
         ei.refreshAsync().toBlocking().last();
 
 
@@ -985,7 +985,7 @@ public class EntityIndexTest extends BaseIT {
         batch.index( indexScope2, second );
 
 
-        batch.execute().get();
+        batch.execute().toBlocking().last();
         ei.refreshAsync().toBlocking().last();
 
 
@@ -1095,7 +1095,7 @@ public class EntityIndexTest extends BaseIT {
         batch.index( indexScope2, second );
 
 
-        batch.execute().get();
+        batch.execute().toBlocking().last();
         ei.refreshAsync().toBlocking().last();
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd3d9395/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
 
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
index 1b4b4a5..25bc48c 100644
--- 
a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
+++ 
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
@@ -134,7 +134,7 @@ public class GeoPagingTest extends BaseIT {
 
         }
 
-        batch.execute().get();
+        batch.execute().toBlocking().last();
 
         ei.refreshAsync().toBlocking().last();
 

Reply via email to