Author: kwall
Date: Wed Feb 3 12:47:38 2016
New Revision: 1728302
URL: http://svn.apache.org/viewvc?rev=1728302&view=rev
Log:
QPID-7024: Set the number of threads in 'group change learner' thread pool to
'group size' + 1
Improve comments/remove TODO
Avoid unnecessary resize of the pool when creating a new node of a new group
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1728302&r1=1728301&r2=1728302&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
(original)
+++
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
Wed Feb 3 12:47:38 2016
@@ -202,6 +202,13 @@ public class ReplicatedEnvironmentFacade
private final ExecutorService _environmentJobExecutor;
private final ListeningExecutorService _stateChangeExecutor;
+
+ /**
+ * Executor used to learn about changes in the group. Number of threads
in the pool is maintained dynammically
+ * to be number of nodes in the group + 1. This gives us sufficient
threads to 'ping' all the remote nodes in the
+ * group (in parallel), and a thread for the coordination of the pings.
We also use the executor for the
+ * transfer master operation.
+ */
private final ScheduledThreadPoolExecutor _groupChangeExecutor;
private final AtomicReference<State> _state = new
AtomicReference<State>(State.OPENING);
private final ConcurrentMap<String, ReplicationNode>
_remoteReplicationNodes = new ConcurrentHashMap<String, ReplicationNode>();
@@ -250,10 +257,10 @@ public class ReplicatedEnvironmentFacade
_defaultDurability = new
Durability(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY,
REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY,
REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY);
_prettyGroupNodeName = _configuration.getGroupName() + ":" +
_configuration.getName();
- // we relay on this executor being single-threaded as we need to
restart and mutate the environment in one thread
+ // we rely on this executor being single-threaded as we need to
restart and mutate the environment from one thread only
_environmentJobExecutor = Executors.newSingleThreadExecutor(new
DaemonThreadFactory("Environment-" + _prettyGroupNodeName));
_stateChangeExecutor =
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new
DaemonThreadFactory("StateChange-" + _prettyGroupNodeName)));
- _groupChangeExecutor = new ScheduledThreadPoolExecutor(1, new
DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
+ _groupChangeExecutor = new ScheduledThreadPoolExecutor(2, new
DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
// create environment in a separate thread to avoid renaming of the
current thread by JE
EnvHomeRegistry.getInstance().registerHome(_environmentDirectory);
@@ -265,10 +272,14 @@ public class ReplicatedEnvironmentFacade
public void run()
{
populateExistingRemoteReplicationNodes();
- int groupSize = _remoteReplicationNodes.size() + 1;
- if (groupSize > 0)
+ int numberOfRemoteNodes = _remoteReplicationNodes.size();
+ if (numberOfRemoteNodes > 0)
{
- _groupChangeExecutor.setCorePoolSize(groupSize + 1);
+ int newPoolSize = numberOfRemoteNodes
+ + 1 /* for this node */
+ + 1 /* for coordination */;
+ _groupChangeExecutor.setCorePoolSize(newPoolSize);
+ LOGGER.debug("Setting group change executor core pool
size to {}", newPoolSize);
}
_groupChangeExecutor.submit(new RemoteNodeStateLearner());
}
@@ -1230,7 +1241,9 @@ public class ReplicatedEnvironmentFacade
public Future<Void> transferMasterAsynchronously(final String nodeName)
{
- // TODO: Should this be executed in the EnvironmentJobExecutor?
+ // Transfer master contacts the group (using GroupAdmin) to request
the mastership change.
+ // It needs to be done asynchronously but not on the
_environmentJobExecutor, as there is
+ // no point delaying transfer master because we are restarting.
return _groupChangeExecutor.submit(new Callable<Void>()
{
@Override
@@ -2065,13 +2078,13 @@ public class ReplicatedEnvironmentFacade
if (env != null)
{
ReplicationGroup group = env.getGroup();
- Set<ReplicationNode> nodes = new
HashSet<ReplicationNode>(group.getNodes());
+ Set<ReplicationNode> nodes = new HashSet<>(group.getNodes());
String localNodeName = getNodeName();
- int knownRemoteNodeNumber = _remoteReplicationNodes.size();
+ int numberOfKnownRemoteNodes = _remoteReplicationNodes.size();
int groupSize = nodes.size();
- Map<String, ReplicationNode> removalMap = new HashMap<String,
ReplicationNode>(_remoteReplicationNodes);
+ Map<String, ReplicationNode> removalMap = new
HashMap<>(_remoteReplicationNodes);
for (ReplicationNode replicationNode : nodes)
{
String discoveredNodeName = replicationNode.getName();
@@ -2125,11 +2138,11 @@ public class ReplicatedEnvironmentFacade
}
}
- if (shouldContinue && knownRemoteNodeNumber + 1 != groupSize)
+ if (shouldContinue && numberOfKnownRemoteNodes + 1 !=
groupSize)
{
- int poolSize = groupSize + 1;
- LOGGER.debug("Setting group change executor core pool size
to {}", poolSize);
- _groupChangeExecutor.setCorePoolSize(poolSize);
+ int newPoolSize = groupSize + 1;
+ LOGGER.debug("Setting group change executor core pool size
to {}", newPoolSize);
+ _groupChangeExecutor.setCorePoolSize(newPoolSize);
}
}
return shouldContinue;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]