fix observable for empty sets.

Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ae1267aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ae1267aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ae1267aa

Branch: refs/heads/USERGRID-593
Commit: ae1267aae5465c51593668fd0781b82ac2fdfabe
Parents: 07b83e3
Author: Shawn Feldman <sfeld...@apache.org>
Authored: Thu Apr 23 13:38:43 2015 -0600
Committer: Shawn Feldman <sfeld...@apache.org>
Committed: Thu Apr 23 13:38:43 2015 -0600

----------------------------------------------------------------------
 .../index/impl/EsIndexBufferConsumerImpl.java     | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ae1267aa/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 0577f28..3f41eb3 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -125,7 +125,7 @@ public class EsIndexBufferConsumerImpl implements 
IndexBufferConsumer {
     private void startSubscription() {
 
 
-        final Observable<IndexOperationMessage> observable = 
Observable.create( bufferProducer );
+        final Observable<IndexOperationMessage> observable = 
Observable.create(bufferProducer);
 
         //buffer on our new thread with a timeout
         observable.buffer( indexFig.getIndexBufferSize(), 
indexFig.getIndexBufferTimeout(), TimeUnit.MILLISECONDS,
@@ -189,11 +189,23 @@ public class EsIndexBufferConsumerImpl implements 
IndexBufferConsumer {
 
         //now that we've processed them all, ack the futures after our last 
batch comes through
         final Observable<IndexOperationMessage> processedIndexOperations =
-            requests.last().flatMap( lastRequest -> Observable.from( batches ) 
);
+            requests.lastOrDefault(null).flatMap( lastRequest ->{
+                if(lastRequest!=null){
+                    return Observable.from( batches ) ;
+                }else{
+                    return Observable.empty();
+                }
+            });
 
         //subscribe to the operations that generate requests on a new thread 
so that we can execute them quickly
         //mark this as done
-        return processedIndexOperations.doOnNext( processedIndexOp -> 
processedIndexOp.done() ).doOnError( t -> log.error( "Unable to ack futures", t 
) );
+        return processedIndexOperations.doOnNext( processedIndexOp ->
+            {
+                processedIndexOp.done();
+            }
+        ).doOnError(t -> {
+            log.error("Unable to ack futures", t);
+        });
     }
 
 

Reply via email to