Repository: incubator-usergrid Updated Branches: refs/heads/two-dot-o-dev 07b83e3bf -> ae1267aae
fix observable for empty sets. Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ae1267aa Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ae1267aa Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ae1267aa Branch: refs/heads/two-dot-o-dev Commit: ae1267aae5465c51593668fd0781b82ac2fdfabe Parents: 07b83e3 Author: Shawn Feldman <sfeld...@apache.org> Authored: Thu Apr 23 13:38:43 2015 -0600 Committer: Shawn Feldman <sfeld...@apache.org> Committed: Thu Apr 23 13:38:43 2015 -0600 ---------------------------------------------------------------------- .../index/impl/EsIndexBufferConsumerImpl.java | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ae1267aa/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java index 0577f28..3f41eb3 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java @@ -125,7 +125,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { private void startSubscription() { - final Observable<IndexOperationMessage> observable = Observable.create( bufferProducer ); + final Observable<IndexOperationMessage> observable = Observable.create(bufferProducer); //buffer on our new thread with a timeout observable.buffer( indexFig.getIndexBufferSize(), indexFig.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, @@ -189,11 +189,23 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { //now that we've processed them all, ack the futures after our last batch comes through final Observable<IndexOperationMessage> processedIndexOperations = - requests.last().flatMap( lastRequest -> Observable.from( batches ) ); + requests.lastOrDefault(null).flatMap( lastRequest ->{ + if(lastRequest!=null){ + return Observable.from( batches ) ; + }else{ + return Observable.empty(); + } + }); //subscribe to the operations that generate requests on a new thread so that we can execute them quickly //mark this as done - return processedIndexOperations.doOnNext( processedIndexOp -> processedIndexOp.done() ).doOnError( t -> log.error( "Unable to ack futures", t ) ); + return processedIndexOperations.doOnNext( processedIndexOp -> + { + processedIndexOp.done(); + } + ).doOnError(t -> { + log.error("Unable to ack futures", t); + }); }