Repository: incubator-usergrid Updated Branches: refs/heads/subscription-bugfixes [created] 3f821f580
Fixes subscription death on error Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3f821f58 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3f821f58 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3f821f58 Branch: refs/heads/subscription-bugfixes Commit: 3f821f5803c467b45148c51ba192fd957146dc1e Parents: e3a4a95 Author: Todd Nine <tn...@apigee.com> Authored: Wed Aug 12 14:59:46 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Wed Aug 12 14:59:46 2015 -0600 ---------------------------------------------------------------------- .../index/impl/EsIndexBufferConsumerImpl.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f821f58/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java index 457a900..ed70c62 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java @@ -177,7 +177,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { batchOperation.doOperation( client, bulkRequestBuilder ); } ) ) //write them - .doOnNext( bulkRequestBuilder -> sendRequest( bulkRequestBuilder ) ).doOnError( t -> log.error( "Unable to process batches", t ) ); + .doOnNext( bulkRequestBuilder -> sendRequest( bulkRequestBuilder ) ); //now that we've processed them all, ack the futures after our last batch comes through @@ -194,9 +194,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { //mark this as done return processedIndexOperations.doOnNext( processedIndexOp -> { processedIndexOp.done(); - roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime()); - } - ).doOnError(t -> log.error("Unable to ack futures", t)); + roundtripTimer.update( System.currentTimeMillis() - processedIndexOp.getCreationTime() ); + } ); } @@ -270,7 +269,11 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { * Send the data through the buffer */ public void send( final IndexOperationMessage indexOp ) { - subscriber.onNext( indexOp ); + try { + subscriber.onNext( indexOp ); + }catch(Exception e){ + log.error( "Unable to process message for indexOp {}, error follows.", indexOp, e ); + } }