Author: orudyy
Date: Thu Feb 4 22:35:12 2016
New Revision: 1728561
URL: http://svn.apache.org/viewvc?rev=1728561&view=rev
Log:
QPID-7024: Set the number of threads in 'group change learner' thread pool to
'group size' + 1
merged from trunk
svn merge -c 1728167,1728302
https://svn.apache.org/repos/asf/qpid/java/trunk
Modified:
qpid/java/branches/6.0.x/ (props changed)
qpid/java/branches/6.0.x/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
Propchange: qpid/java/branches/6.0.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb 4 22:35:12 2016
@@ -9,5 +9,5 @@
/qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
/qpid/branches/java-network-refactor/qpid/java:805429-821809
/qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1728089
+/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1728089,1728167,1728302
/qpid/trunk/qpid:796646-796653
Modified:
qpid/java/branches/6.0.x/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1728561&r1=1728560&r2=1728561&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
(original)
+++
qpid/java/branches/6.0.x/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
Thu Feb 4 22:35:12 2016
@@ -40,7 +40,7 @@ import java.util.concurrent.ExecutionExc
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
@@ -202,7 +202,14 @@ public class ReplicatedEnvironmentFacade
private final ExecutorService _environmentJobExecutor;
private final ListeningExecutorService _stateChangeExecutor;
- private final ScheduledExecutorService _groupChangeExecutor;
+
+ /**
+ * Executor used to learn about changes in the group. Number of threads
in the pool is maintained dynammically
+ * to be number of nodes in the group + 1. This gives us sufficient
threads to 'ping' all the remote nodes in the
+ * group (in parallel), and a thread for the coordination of the pings.
We also use the executor for the
+ * transfer master operation.
+ */
+ private final ScheduledThreadPoolExecutor _groupChangeExecutor;
private final AtomicReference<State> _state = new
AtomicReference<State>(State.OPENING);
private final ConcurrentMap<String, ReplicationNode>
_remoteReplicationNodes = new ConcurrentHashMap<String, ReplicationNode>();
private final AtomicReference<ReplicationGroupListener>
_replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
@@ -250,10 +257,10 @@ public class ReplicatedEnvironmentFacade
_defaultDurability = new
Durability(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY,
REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY,
REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY);
_prettyGroupNodeName = _configuration.getGroupName() + ":" +
_configuration.getName();
- // we relay on this executor being single-threaded as we need to
restart and mutate the environment in one thread
+ // we rely on this executor being single-threaded as we need to
restart and mutate the environment from one thread only
_environmentJobExecutor = Executors.newSingleThreadExecutor(new
DaemonThreadFactory("Environment-" + _prettyGroupNodeName));
_stateChangeExecutor =
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new
DaemonThreadFactory("StateChange-" + _prettyGroupNodeName)));
- _groupChangeExecutor =
Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() +
1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
+ _groupChangeExecutor = new ScheduledThreadPoolExecutor(2, new
DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
// create environment in a separate thread to avoid renaming of the
current thread by JE
EnvHomeRegistry.getInstance().registerHome(_environmentDirectory);
@@ -265,6 +272,15 @@ public class ReplicatedEnvironmentFacade
public void run()
{
populateExistingRemoteReplicationNodes();
+ int numberOfRemoteNodes = _remoteReplicationNodes.size();
+ if (numberOfRemoteNodes > 0)
+ {
+ int newPoolSize = numberOfRemoteNodes
+ + 1 /* for this node */
+ + 1 /* for coordination */;
+ _groupChangeExecutor.setCorePoolSize(newPoolSize);
+ LOGGER.debug("Setting group change executor core pool
size to {}", newPoolSize);
+ }
_groupChangeExecutor.submit(new RemoteNodeStateLearner());
}
});
@@ -1225,7 +1241,9 @@ public class ReplicatedEnvironmentFacade
public Future<Void> transferMasterAsynchronously(final String nodeName)
{
- // TODO: Should this be executed in the EnvironmentJobExecutor?
+ // Transfer master contacts the group (using GroupAdmin) to request
the mastership change.
+ // It needs to be done asynchronously but not on the
_environmentJobExecutor, as there is
+ // no point delaying transfer master because we are restarting.
return _groupChangeExecutor.submit(new Callable<Void>()
{
@Override
@@ -2060,10 +2078,13 @@ public class ReplicatedEnvironmentFacade
if (env != null)
{
ReplicationGroup group = env.getGroup();
- Set<ReplicationNode> nodes = new
HashSet<ReplicationNode>(group.getNodes());
+ Set<ReplicationNode> nodes = new HashSet<>(group.getNodes());
String localNodeName = getNodeName();
- Map<String, ReplicationNode> removalMap = new HashMap<String,
ReplicationNode>(_remoteReplicationNodes);
+ int numberOfKnownRemoteNodes = _remoteReplicationNodes.size();
+ int groupSize = nodes.size();
+
+ Map<String, ReplicationNode> removalMap = new
HashMap<>(_remoteReplicationNodes);
for (ReplicationNode replicationNode : nodes)
{
String discoveredNodeName = replicationNode.getName();
@@ -2116,6 +2137,13 @@ public class ReplicatedEnvironmentFacade
}
}
}
+
+ if (shouldContinue && numberOfKnownRemoteNodes + 1 !=
groupSize)
+ {
+ int newPoolSize = groupSize + 1;
+ LOGGER.debug("Setting group change executor core pool size
to {}", newPoolSize);
+ _groupChangeExecutor.setCorePoolSize(newPoolSize);
+ }
}
return shouldContinue;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]