Fixes issue with throwable not being caught in onSubscribe function
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2d1c8b8a Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2d1c8b8a Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2d1c8b8a Branch: refs/heads/two-dot-o_exception_verification Commit: 2d1c8b8ac7b20b63a11d83adca56839d8b409cca Parents: beb2a2a Author: Todd Nine <[email protected]> Authored: Thu Mar 26 09:47:58 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Thu Mar 26 09:47:58 2015 -0600 ---------------------------------------------------------------------- .../index/impl/EsIndexBufferConsumerImpl.java | 39 +++++++------------- 1 file changed, 14 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d1c8b8a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java index 7e64de3..d064b97 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java @@ -153,12 +153,16 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { //name our thread so it's easy to see Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() ); - List<IndexOperationMessage> drainList; + + List<IndexOperationMessage> drainList = null; + do { - try { + Timer.Context timer = produceTimer.time(); + + + try { - Timer.Context timer = produceTimer.time(); drainList = bufferQueue .take( config.getIndexBufferSize(), config.getIndexBufferTimeout(), @@ -174,10 +178,15 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { timer.stop(); } - catch ( Exception e ) { + catch ( Throwable t ) { final long sleepTime = config.getFailureRetryTime(); - log.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, e ); + log.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t ); + + if ( drainList != null ) { + inFlight.addAndGet( -1 * drainList.size() ); + } + try { Thread.sleep( sleepTime ); @@ -216,26 +225,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer { inFlight.addAndGet( -1 * indexOperationMessages.size() ); } } ) - //catch an unexpected error, then emit an empty list to ensure our subscriber doesn't die - .onErrorReturn( new Func1<Throwable, List<IndexOperationMessage>>() { - @Override - public List<IndexOperationMessage> call( final Throwable throwable ) { - final long sleepTime = config.getFailureRetryTime(); - - log.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, throwable ); - - try { - Thread.sleep( sleepTime ); - } - catch ( InterruptedException ie ) { - //swallow - } - - indexErrorCounter.inc(); - - return Collections.EMPTY_LIST; - } - } ) .subscribeOn( Schedulers.newThread() );
