Repository: usergrid Updated Branches: refs/heads/master 159c8c325 -> e0d6481ff
Fix buffer to use the proper interface of timespan, unit, count Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3b03a3cb Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3b03a3cb Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3b03a3cb Branch: refs/heads/master Commit: 3b03a3cbd29dbc290e197e4446285c849c4b5b85 Parents: 91739c6 Author: Michael Russo <[email protected]> Authored: Sun Feb 28 22:03:36 2016 -0800 Committer: Michael Russo <[email protected]> Committed: Sun Feb 28 22:03:36 2016 -0800 ---------------------------------------------------------------------- .../apache/usergrid/corepersistence/index/IndexServiceImpl.java | 2 +- .../usergrid/persistence/index/impl/EsIndexProducerImpl.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/3b03a3cb/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java index 7512c90..6c07212 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@ -106,7 +106,7 @@ public class IndexServiceImpl implements IndexService { //do our observable for batching //try to send a whole batch if we can final Observable<IndexOperationMessage> batches = sourceEdgesToIndex - .buffer( indexFig.getIndexBatchSize(), 1000, TimeUnit.MILLISECONDS ) + .buffer(250, TimeUnit.MILLISECONDS, indexFig.getIndexBatchSize() ) //map into batches based on our buffer size .flatMap( buffer -> Observable.from( buffer ) http://git-wip-us.apache.org/repos/asf/usergrid/blob/3b03a3cb/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java index 62102b4..10d5e4a 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java @@ -132,7 +132,7 @@ public class EsIndexProducerImpl implements IndexProducer { //buffer into the max size we can send ES and fire them all off until we're completed final Observable<BulkRequestBuilder> requests = batchOps - .buffer(indexFig.getIndexBatchSize(), 1000, TimeUnit.MILLISECONDS) + .buffer(250, TimeUnit.MILLISECONDS, indexFig.getIndexBatchSize()) //flatten the buffer into a single batch execution .flatMap(individualOps -> Observable.from(individualOps)
