Author: kwall
Date: Tue Sep 23 06:55:16 2014
New Revision: 1626954

URL: http://svn.apache.org/r1626954
Log:
QPID-6111: [Java Broker] HA - Ensure that when the REF is shutdown sufficient 
time is allowed to allow any in progress JE ReplicatedEnvironment to complete.

Modified:
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1626954&r1=1626953&r2=1626954&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
 Tue Sep 23 06:55:16 2014
@@ -188,17 +188,17 @@ public class ReplicatedEnvironmentFacade
     private final AtomicReference<ReplicationGroupListener> 
_replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
     private final AtomicReference<StateChangeListener> _stateChangeListener = 
new AtomicReference<StateChangeListener>();
     private final Durability _defaultDurability;
+    private final ConcurrentMap<String, Database> _cachedDatabases = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<DatabaseEntry, Sequence> _cachedSequences = 
new ConcurrentHashMap<>();
+    private final Set<String> _permittedNodes = new 
CopyOnWriteArraySet<String>();
 
     private volatile Durability _realMessageStoreDurability = null;
+    private volatile Durability _messageStoreDurability;
     private volatile CoalescingCommiter _coalescingCommiter = null;
     private volatile ReplicatedEnvironment _environment;
     private volatile long _joinTime;
     private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
-    private volatile Durability _messageStoreDurability;
-
-    private final ConcurrentMap<String, Database> _cachedDatabases = new 
ConcurrentHashMap<>();
-    private final ConcurrentMap<DatabaseEntry, Sequence> _cachedSequences = 
new ConcurrentHashMap<>();
-    private final Set<String> _permittedNodes = new 
CopyOnWriteArraySet<String>();
+    private volatile long _envSetupTimeoutMillis;
 
     public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration 
configuration)
     {
@@ -306,8 +306,11 @@ public class ReplicatedEnvironmentFacade
                     LOGGER.debug("Closing replicated environment facade for " 
+ _prettyGroupNodeName + " current state is " + _state.get());
                 }
 
-                shutdownAndAwaitExecutorService(_environmentJobExecutor);
-                shutdownAndAwaitExecutorService(_groupChangeExecutor);
+                long timeout = Math.min(_executorShutdownTimeout, 
_envSetupTimeoutMillis);
+                shutdownAndAwaitExecutorService(_environmentJobExecutor,
+                                                timeout,
+                                                TimeUnit.MILLISECONDS);
+                shutdownAndAwaitExecutorService(_groupChangeExecutor, 
_executorShutdownTimeout, TimeUnit.MILLISECONDS);
 
                 try
                 {
@@ -347,17 +350,17 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
-    private void shutdownAndAwaitExecutorService(ExecutorService 
executorService)
+    private void shutdownAndAwaitExecutorService(ExecutorService 
executorService, long executorShutdownTimeout, TimeUnit timeUnit)
     {
         executorService.shutdown();
         try
         {
-            boolean wasShutdown = 
executorService.awaitTermination(_executorShutdownTimeout, 
TimeUnit.MILLISECONDS);
+            boolean wasShutdown = 
executorService.awaitTermination(executorShutdownTimeout, timeUnit);
             if (!wasShutdown)
             {
                 LOGGER.warn("Executor service " + executorService +
-                            " did not shutdown within allowed time period " + 
_executorShutdownTimeout +
-                            ", ignoring");
+                            " did not shutdown within allowed time period " + 
_executorShutdownTimeout
+                            + " " + timeUnit + ", ignoring");
             }
         }
         catch (InterruptedException e)
@@ -434,7 +437,10 @@ public class ReplicatedEnvironmentFacade
     @Override
     public Database openDatabase(String name, DatabaseConfig databaseConfig)
     {
-        LOGGER.debug("openDatabase " + name + " for " + _prettyGroupNodeName);
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("openDatabase " + name + " for " + 
_prettyGroupNodeName);
+        }
         if (_state.get() != State.OPEN)
         {
             throw new IllegalStateException("Environment facade is not in 
opened state");
@@ -452,13 +458,19 @@ public class ReplicatedEnvironmentFacade
             Database existingHandle = _cachedDatabases.putIfAbsent(name, 
handle);
             if (existingHandle == null)
             {
-                LOGGER.debug("openDatabase " + name + " new handle");
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("openDatabase " + name + " new handle");
+                }
 
                 cachedHandle = handle;
             }
             else
             {
-                LOGGER.debug("openDatabase " + name + " existing handle");
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("openDatabase " + name + " existing handle");
+                }
                 cachedHandle = existingHandle;
                 handle.close();
             }
@@ -1091,18 +1103,10 @@ public class ReplicatedEnvironmentFacade
             @Override
             public ReplicatedEnvironment call() throws Exception
             {
-                String originalThreadName = Thread.currentThread().getName();
-                try
-                {
-                    return createEnvironment(environmentPathFile, envConfig, 
replicationConfig);
-                }
-                finally
-                {
-                    Thread.currentThread().setName(originalThreadName);
-                }
+                return createEnvironment(environmentPathFile, envConfig, 
replicationConfig);
             }});
 
-        long setUpTimeOutMillis = 
PropUtil.parseDuration(replicationConfig.getConfigParam(ReplicationConfig.ENV_SETUP_TIMEOUT));
+        long setUpTimeOutMillis = 
extractEnvSetupTimeoutMillis(replicationConfig);
         try
         {
             return environmentFuture.get(setUpTimeOutMillis, 
TimeUnit.MILLISECONDS);
@@ -1118,7 +1122,8 @@ public class ReplicatedEnvironmentFacade
         }
         catch (TimeoutException e)
         {
-            throw new RuntimeException("JE environment has not been created in 
due time");
+            throw new RuntimeException("JE replicated environment creation 
took too long (permitted time "
+                                       + setUpTimeOutMillis + "ms)");
         }
     }
 
@@ -1126,19 +1131,28 @@ public class ReplicatedEnvironmentFacade
             final ReplicationConfig replicationConfig)
     {
         ReplicatedEnvironment environment = null;
+
+        String originalThreadName = Thread.currentThread().getName();
         try
         {
+            _envSetupTimeoutMillis = 
extractEnvSetupTimeoutMillis(replicationConfig);
             environment = new ReplicatedEnvironment(environmentPathFile, 
replicationConfig, envConfig);
         }
         catch (final InsufficientLogException ile)
         {
-            LOGGER.info("InsufficientLogException thrown and so full network 
restore required", ile);
+            LOGGER.warn("The log files of this node are too old. Network 
restore will begin now.", ile);
             NetworkRestore restore = new NetworkRestore();
             NetworkRestoreConfig config = new NetworkRestoreConfig();
             config.setRetainLogFiles(false);
             restore.execute(ile, config);
+            LOGGER.warn("Network restore complete.");
             environment = new ReplicatedEnvironment(environmentPathFile, 
replicationConfig, envConfig);
         }
+        finally
+        {
+            Thread.currentThread().setName(originalThreadName);
+        }
+
         if (LOGGER.isInfoEnabled())
         {
             LOGGER.info("Environment is created for node " + 
_prettyGroupNodeName);
@@ -1146,6 +1160,11 @@ public class ReplicatedEnvironmentFacade
         return environment;
     }
 
+    private long extractEnvSetupTimeoutMillis(ReplicationConfig 
replicationConfig)
+    {
+        return (long) 
PropUtil.parseDuration(replicationConfig.getConfigParam(ReplicationConfig.ENV_SETUP_TIMEOUT));
+    }
+
     public int getNumberOfElectableGroupMembers()
     {
         if (_state.get() != State.OPEN)
@@ -1339,8 +1358,11 @@ public class ReplicatedEnvironmentFacade
         }
         else
         {
-            LOGGER.warn(String.format("Found an intruder node '%s' from ''%s' 
. The node is not listed in permitted list: %s",
-                    replicationNode.getName(), getHostPort(replicationNode), 
String.valueOf(_permittedNodes)));
+            LOGGER.warn(String.format(
+                    "Found an intruder node '%s' from ''%s' . The node is not 
listed in permitted list: %s",
+                    replicationNode.getName(),
+                    getHostPort(replicationNode),
+                    String.valueOf(_permittedNodes)));
             return true;
         }
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to