JAMES-1999 Elasticsearch does not retry to connect when error

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d258d900
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d258d900
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d258d900

Branch: refs/heads/master
Commit: d258d900fea5c220480bcaad095942ecd59d2b12
Parents: 6c5912f
Author: quynhn <qngu...@linagora.com>
Authored: Fri Apr 14 09:48:31 2017 +0700
Committer: benwa <btell...@linagora.com>
Committed: Fri Apr 21 07:52:19 2017 +0700

----------------------------------------------------------------------
 .../modules/mailbox/CassandraSessionModule.java | 43 ++++++++++++--------
 .../mailbox/ElasticSearchMailboxModule.java     | 26 ++++++------
 2 files changed, 40 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/d258d900/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
----------------------------------------------------------------------
diff --git 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
 
b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
index 8146c5a..8b92f99 100644
--- 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
+++ 
b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSessionModule.java
@@ -29,6 +29,7 @@ import com.google.inject.Provides;
 import com.google.inject.Singleton;
 import com.google.inject.multibindings.Multibinder;
 import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
+import com.nurkiewicz.asyncretry.function.RetryCallable;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.james.backends.cassandra.components.CassandraModule;
@@ -40,6 +41,9 @@ import 
org.apache.james.backends.cassandra.init.QueryLoggerConfiguration;
 import 
org.apache.james.backends.cassandra.init.SessionWithInitializedTablesFactory;
 import org.apache.james.filesystem.api.FileSystem;
 import org.apache.james.util.Host;
+import org.apache.james.utils.RetryExecutorUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.FileNotFoundException;
 import java.util.Arrays;
@@ -51,6 +55,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 public class CassandraSessionModule extends AbstractModule {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraSessionModule.class);
 
     private static final int DEFAULT_CONNECTION_MAX_RETRIES = 10;
     private static final int DEFAULT_CONNECTION_MIN_DELAY = 5000;
@@ -91,19 +96,30 @@ public class CassandraSessionModule extends AbstractModule {
         List<Host> servers = listCassandraServers(configuration);
         QueryLoggerConfiguration queryLoggerConfiguration = 
getCassandraQueryLoggerConf(configuration);
 
-        return getRetryer(executor, configuration)
-                .getWithRetry(ctx -> ClusterWithKeyspaceCreatedFactory
-                        .config(
-                            ClusterBuilder.builder()
-                                .servers(servers)
-                                
.queryLoggerConfiguration(queryLoggerConfiguration)
-                                .build(),
-                            configuration.getString("cassandra.keyspace"))
-                        
.replicationFactor(configuration.getInt("cassandra.replication.factor"))
-                        .clusterWithInitializedKeyspace())
+        int maxRetries = 
configuration.getInt("cassandra.retryConnection.maxRetries", 
DEFAULT_CONNECTION_MAX_RETRIES);
+        int minDelay = 
configuration.getInt("cassandra.retryConnection.minDelay", 
DEFAULT_CONNECTION_MIN_DELAY);
+
+        return RetryExecutorUtil.retryOnExceptions(executor, maxRetries, 
minDelay, NoHostAvailableException.class)
+                .getWithRetry(getClusterRetryCallable(configuration, servers, 
queryLoggerConfiguration))
                 .get();
     }
 
+    private RetryCallable<Cluster> 
getClusterRetryCallable(PropertiesConfiguration configuration, List<Host> 
servers, QueryLoggerConfiguration queryLoggerConfiguration) {
+        LOGGER.info("Trying to connect to Cassandra service");
+
+        return context -> ClusterWithKeyspaceCreatedFactory
+            .config(getCluster(servers, queryLoggerConfiguration), 
configuration.getString("cassandra.keyspace"))
+            
.replicationFactor(configuration.getInt("cassandra.replication.factor"))
+            .clusterWithInitializedKeyspace();
+    }
+
+    private Cluster getCluster(List<Host> servers, QueryLoggerConfiguration 
queryLoggerConfiguration) {
+        return ClusterBuilder.builder()
+            .servers(servers)
+            .queryLoggerConfiguration(queryLoggerConfiguration)
+            .build();
+    }
+
     private List<Host> listCassandraServers(PropertiesConfiguration 
configuration) {
         String[] ipAndPorts = configuration.getStringArray("cassandra.nodes");
 
@@ -154,13 +170,6 @@ public class CassandraSessionModule extends AbstractModule 
{
         return builder.build();
     }
 
-    private static AsyncRetryExecutor getRetryer(AsyncRetryExecutor executor, 
PropertiesConfiguration configuration) {
-        return executor.retryOn(NoHostAvailableException.class)
-                .withProportionalJitter()
-                
.withMaxRetries(configuration.getInt("cassandra.retryConnection.maxRetries", 
DEFAULT_CONNECTION_MAX_RETRIES))
-                
.withMinDelay(configuration.getInt("cassandra.retryConnection.minDelay", 
DEFAULT_CONNECTION_MIN_DELAY));
-    }
-
     @Provides
     private AsyncRetryExecutor 
provideAsyncRetryExecutor(ScheduledExecutorService scheduler) {
         return new AsyncRetryExecutor(scheduler);

http://git-wip-us.apache.org/repos/asf/james-project/blob/d258d900/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
----------------------------------------------------------------------
diff --git 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
 
b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
index 4c3c9ef..48c4145 100644
--- 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
+++ 
b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
@@ -42,8 +42,11 @@ import org.apache.james.mailbox.extractor.TextExtractor;
 import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
 import org.apache.james.mailbox.store.search.MessageSearchIndex;
 import org.apache.james.mailbox.tika.extractor.TikaTextExtractor;
+import org.apache.james.utils.RetryExecutorUtil;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.NoNodeAvailableException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.AbstractModule;
@@ -52,6 +55,7 @@ import com.google.inject.Scopes;
 import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
 
 public class ElasticSearchMailboxModule extends AbstractModule {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticSearchMailboxModule.class);
 
     public static final String ES_CONFIG_FILE = 
FileSystem.FILE_PROTOCOL_AND_CONF + "elasticsearch.properties";
     public static final String ELASTICSEARCH_HOSTS = "elasticsearch.hosts";
@@ -77,11 +81,17 @@ public class ElasticSearchMailboxModule extends 
AbstractModule {
     @Singleton
     protected Client provideClientProvider(FileSystem fileSystem, 
AsyncRetryExecutor executor) throws ConfigurationException, 
FileNotFoundException, ExecutionException, InterruptedException {
         PropertiesConfiguration propertiesReader = new 
PropertiesConfiguration(fileSystem.getFile(ES_CONFIG_FILE));
+        int maxRetries = 
propertiesReader.getInt("elasticsearch.retryConnection.maxRetries", 
DEFAULT_CONNECTION_MAX_RETRIES);
+        int minDelay = 
propertiesReader.getInt("elasticsearch.retryConnection.minDelay", 
DEFAULT_CONNECTION_MIN_DELAY);
 
-        ClientProvider clientProvider = connectToCluster(propertiesReader);
+        return RetryExecutorUtil.retryOnExceptions(executor, maxRetries, 
minDelay, NoNodeAvailableException.class)
+            .getWithRetry(context -> connectToCluster(propertiesReader))
+            .get();
+    }
 
-        Client client = getRetryer(executor, propertiesReader)
-                .getWithRetry(ctx -> clientProvider.get()).get();
+    private Client createClientAndIndex(ClientProvider clientProvider, 
PropertiesConfiguration propertiesReader) {
+        LOGGER.info("Trying to connect to ElasticSearch service");
+        Client client = clientProvider.get();
         IndexCreationFactory.createIndex(client,
             MailboxElasticsearchConstants.MAILBOX_INDEX,
             propertiesReader.getInt("elasticsearch.nb.shards"),
@@ -122,15 +132,7 @@ public class ElasticSearchMailboxModule extends 
AbstractModule {
         }
     }
 
-    private static AsyncRetryExecutor getRetryer(AsyncRetryExecutor executor, 
PropertiesConfiguration configuration) {
-        return executor
-                .withProportionalJitter()
-                .retryOn(NoNodeAvailableException.class)
-                
.withMaxRetries(configuration.getInt("elasticsearch.retryConnection.maxRetries",
 DEFAULT_CONNECTION_MAX_RETRIES))
-                
.withMinDelay(configuration.getInt("elasticsearch.retryConnection.minDelay", 
DEFAULT_CONNECTION_MIN_DELAY));
-    }
-
-    @Provides 
+    @Provides
     @Singleton
     public IndexAttachments provideIndexAttachments(PropertiesConfiguration 
configuration) {
         if (configuration.getBoolean("elasticsearch.indexAttachments", 
DEFAULT_INDEX_ATTACHMENTS)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to