Repository: usergrid Updated Branches: refs/heads/master f49fe0a60 -> 0ada637fe
Add retry logic to initial session connections to the database. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0ada637f Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0ada637f Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0ada637f Branch: refs/heads/master Commit: 0ada637febb50fe04415cfb1fc6015818b7aa0a8 Parents: f49fe0a Author: Michael Russo <[email protected]> Authored: Sun Nov 20 10:39:49 2016 -0800 Committer: Michael Russo <[email protected]> Committed: Sun Nov 20 10:39:49 2016 -0800 ---------------------------------------------------------------------- .../core/datastax/impl/DataStaxClusterImpl.java | 58 +++++++++++++++++++- 1 file changed, 55 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/0ada637f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java index a318c3c..d49737d 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java @@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.core.datastax.impl; import com.datastax.driver.core.*; +import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; import com.datastax.driver.core.policies.LoadBalancingPolicy; import com.datastax.driver.core.policies.Policies; @@ -76,7 +77,24 @@ public class DataStaxClusterImpl implements DataStaxCluster { // always grab cluster from getCluster() in case it was prematurely closed if ( clusterSession == null || clusterSession.isClosed() ){ - clusterSession = getCluster().connect(); + int retries = 3; + int retryCount = 0; + while ( retryCount < retries){ + try{ + retryCount++; + clusterSession = getCluster().connect(); + break; + }catch(NoHostAvailableException e){ + if(retryCount == retries){ + throw e; + } + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + // swallow + } + } + } } return clusterSession; @@ -87,7 +105,24 @@ public class DataStaxClusterImpl implements DataStaxCluster { // always grab cluster from getCluster() in case it was prematurely closed if ( applicationSession == null || applicationSession.isClosed() ){ - applicationSession = getCluster().connect( CQLUtils.quote( cassandraConfig.getApplicationKeyspace() ) ); + int retries = 3; + int retryCount = 0; + while ( retryCount < retries){ + try{ + retryCount++; + applicationSession = getCluster().connect( CQLUtils.quote( cassandraConfig.getApplicationKeyspace() ) ); + break; + }catch(NoHostAvailableException e){ + if(retryCount == retries){ + throw e; + } + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + // swallow + } + } + } } return applicationSession; } @@ -98,7 +133,24 @@ public class DataStaxClusterImpl implements DataStaxCluster { // always grab cluster from getCluster() in case it was prematurely closed if ( queueMessageSession == null || queueMessageSession.isClosed() ){ - queueMessageSession = getCluster().connect( CQLUtils.quote( cassandraConfig.getApplicationLocalKeyspace() ) ); + int retries = 3; + int retryCount = 0; + while ( retryCount < retries){ + try{ + retryCount++; + queueMessageSession = getCluster().connect( CQLUtils.quote( cassandraConfig.getApplicationLocalKeyspace() ) ); + break; + }catch(NoHostAvailableException e){ + if(retryCount == retries){ + throw e; + } + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + // swallow + } + } + } } return queueMessageSession; }
