Author: kwall
Date: Tue Sep 23 06:55:16 2014
New Revision: 1626954
URL: http://svn.apache.org/r1626954
Log:
QPID-6111: [Java Broker] HA - Ensure that when the REF is shutdown sufficient
time is allowed to allow any in progress JE ReplicatedEnvironment to complete.
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1626954&r1=1626953&r2=1626954&view=diff
==============================================================================
---
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
(original)
+++
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
Tue Sep 23 06:55:16 2014
@@ -188,17 +188,17 @@ public class ReplicatedEnvironmentFacade
private final AtomicReference<ReplicationGroupListener>
_replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
private final AtomicReference<StateChangeListener> _stateChangeListener =
new AtomicReference<StateChangeListener>();
private final Durability _defaultDurability;
+ private final ConcurrentMap<String, Database> _cachedDatabases = new
ConcurrentHashMap<>();
+ private final ConcurrentMap<DatabaseEntry, Sequence> _cachedSequences =
new ConcurrentHashMap<>();
+ private final Set<String> _permittedNodes = new
CopyOnWriteArraySet<String>();
private volatile Durability _realMessageStoreDurability = null;
+ private volatile Durability _messageStoreDurability;
private volatile CoalescingCommiter _coalescingCommiter = null;
private volatile ReplicatedEnvironment _environment;
private volatile long _joinTime;
private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
- private volatile Durability _messageStoreDurability;
-
- private final ConcurrentMap<String, Database> _cachedDatabases = new
ConcurrentHashMap<>();
- private final ConcurrentMap<DatabaseEntry, Sequence> _cachedSequences =
new ConcurrentHashMap<>();
- private final Set<String> _permittedNodes = new
CopyOnWriteArraySet<String>();
+ private volatile long _envSetupTimeoutMillis;
public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration
configuration)
{
@@ -306,8 +306,11 @@ public class ReplicatedEnvironmentFacade
LOGGER.debug("Closing replicated environment facade for "
+ _prettyGroupNodeName + " current state is " + _state.get());
}
- shutdownAndAwaitExecutorService(_environmentJobExecutor);
- shutdownAndAwaitExecutorService(_groupChangeExecutor);
+ long timeout = Math.min(_executorShutdownTimeout,
_envSetupTimeoutMillis);
+ shutdownAndAwaitExecutorService(_environmentJobExecutor,
+ timeout,
+ TimeUnit.MILLISECONDS);
+ shutdownAndAwaitExecutorService(_groupChangeExecutor,
_executorShutdownTimeout, TimeUnit.MILLISECONDS);
try
{
@@ -347,17 +350,17 @@ public class ReplicatedEnvironmentFacade
}
}
- private void shutdownAndAwaitExecutorService(ExecutorService
executorService)
+ private void shutdownAndAwaitExecutorService(ExecutorService
executorService, long executorShutdownTimeout, TimeUnit timeUnit)
{
executorService.shutdown();
try
{
- boolean wasShutdown =
executorService.awaitTermination(_executorShutdownTimeout,
TimeUnit.MILLISECONDS);
+ boolean wasShutdown =
executorService.awaitTermination(executorShutdownTimeout, timeUnit);
if (!wasShutdown)
{
LOGGER.warn("Executor service " + executorService +
- " did not shutdown within allowed time period " +
_executorShutdownTimeout +
- ", ignoring");
+ " did not shutdown within allowed time period " +
_executorShutdownTimeout
+ + " " + timeUnit + ", ignoring");
}
}
catch (InterruptedException e)
@@ -434,7 +437,10 @@ public class ReplicatedEnvironmentFacade
@Override
public Database openDatabase(String name, DatabaseConfig databaseConfig)
{
- LOGGER.debug("openDatabase " + name + " for " + _prettyGroupNodeName);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("openDatabase " + name + " for " +
_prettyGroupNodeName);
+ }
if (_state.get() != State.OPEN)
{
throw new IllegalStateException("Environment facade is not in
opened state");
@@ -452,13 +458,19 @@ public class ReplicatedEnvironmentFacade
Database existingHandle = _cachedDatabases.putIfAbsent(name,
handle);
if (existingHandle == null)
{
- LOGGER.debug("openDatabase " + name + " new handle");
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("openDatabase " + name + " new handle");
+ }
cachedHandle = handle;
}
else
{
- LOGGER.debug("openDatabase " + name + " existing handle");
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("openDatabase " + name + " existing handle");
+ }
cachedHandle = existingHandle;
handle.close();
}
@@ -1091,18 +1103,10 @@ public class ReplicatedEnvironmentFacade
@Override
public ReplicatedEnvironment call() throws Exception
{
- String originalThreadName = Thread.currentThread().getName();
- try
- {
- return createEnvironment(environmentPathFile, envConfig,
replicationConfig);
- }
- finally
- {
- Thread.currentThread().setName(originalThreadName);
- }
+ return createEnvironment(environmentPathFile, envConfig,
replicationConfig);
}});
- long setUpTimeOutMillis =
PropUtil.parseDuration(replicationConfig.getConfigParam(ReplicationConfig.ENV_SETUP_TIMEOUT));
+ long setUpTimeOutMillis =
extractEnvSetupTimeoutMillis(replicationConfig);
try
{
return environmentFuture.get(setUpTimeOutMillis,
TimeUnit.MILLISECONDS);
@@ -1118,7 +1122,8 @@ public class ReplicatedEnvironmentFacade
}
catch (TimeoutException e)
{
- throw new RuntimeException("JE environment has not been created in
due time");
+ throw new RuntimeException("JE replicated environment creation
took too long (permitted time "
+ + setUpTimeOutMillis + "ms)");
}
}
@@ -1126,19 +1131,28 @@ public class ReplicatedEnvironmentFacade
final ReplicationConfig replicationConfig)
{
ReplicatedEnvironment environment = null;
+
+ String originalThreadName = Thread.currentThread().getName();
try
{
+ _envSetupTimeoutMillis =
extractEnvSetupTimeoutMillis(replicationConfig);
environment = new ReplicatedEnvironment(environmentPathFile,
replicationConfig, envConfig);
}
catch (final InsufficientLogException ile)
{
- LOGGER.info("InsufficientLogException thrown and so full network
restore required", ile);
+ LOGGER.warn("The log files of this node are too old. Network
restore will begin now.", ile);
NetworkRestore restore = new NetworkRestore();
NetworkRestoreConfig config = new NetworkRestoreConfig();
config.setRetainLogFiles(false);
restore.execute(ile, config);
+ LOGGER.warn("Network restore complete.");
environment = new ReplicatedEnvironment(environmentPathFile,
replicationConfig, envConfig);
}
+ finally
+ {
+ Thread.currentThread().setName(originalThreadName);
+ }
+
if (LOGGER.isInfoEnabled())
{
LOGGER.info("Environment is created for node " +
_prettyGroupNodeName);
@@ -1146,6 +1160,11 @@ public class ReplicatedEnvironmentFacade
return environment;
}
+ private long extractEnvSetupTimeoutMillis(ReplicationConfig
replicationConfig)
+ {
+ return (long)
PropUtil.parseDuration(replicationConfig.getConfigParam(ReplicationConfig.ENV_SETUP_TIMEOUT));
+ }
+
public int getNumberOfElectableGroupMembers()
{
if (_state.get() != State.OPEN)
@@ -1339,8 +1358,11 @@ public class ReplicatedEnvironmentFacade
}
else
{
- LOGGER.warn(String.format("Found an intruder node '%s' from ''%s'
. The node is not listed in permitted list: %s",
- replicationNode.getName(), getHostPort(replicationNode),
String.valueOf(_permittedNodes)));
+ LOGGER.warn(String.format(
+ "Found an intruder node '%s' from ''%s' . The node is not
listed in permitted list: %s",
+ replicationNode.getName(),
+ getHostPort(replicationNode),
+ String.valueOf(_permittedNodes)));
return true;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]