Author: kwall
Date: Wed Feb  3 12:47:38 2016
New Revision: 1728302

URL: http://svn.apache.org/viewvc?rev=1728302&view=rev
Log:
QPID-7024: Set the number of threads in 'group change learner' thread pool to 
'group size' + 1

Improve comments/remove TODO
Avoid unnecessary resize of the pool when creating a new node of a new group

Modified:
    
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java

Modified: 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1728302&r1=1728301&r2=1728302&view=diff
==============================================================================
--- 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
 (original)
+++ 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
 Wed Feb  3 12:47:38 2016
@@ -202,6 +202,13 @@ public class ReplicatedEnvironmentFacade
 
     private final ExecutorService _environmentJobExecutor;
     private final ListeningExecutorService _stateChangeExecutor;
+
+    /**
+     * Executor used to learn about changes in the group.  Number of threads 
in the pool is maintained dynammically
+     * to be number of nodes in the group + 1.   This gives us sufficient 
threads to 'ping' all the remote nodes in the
+     * group (in parallel), and a thread for the coordination of the pings.  
We also use the executor for the
+     * transfer master operation.
+     */
     private final ScheduledThreadPoolExecutor _groupChangeExecutor;
     private final AtomicReference<State> _state = new 
AtomicReference<State>(State.OPENING);
     private final ConcurrentMap<String, ReplicationNode> 
_remoteReplicationNodes = new ConcurrentHashMap<String, ReplicationNode>();
@@ -250,10 +257,10 @@ public class ReplicatedEnvironmentFacade
         _defaultDurability = new 
Durability(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, 
REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, 
REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY);
         _prettyGroupNodeName = _configuration.getGroupName() + ":" + 
_configuration.getName();
 
-        // we relay on this executor being single-threaded as we need to 
restart and mutate the environment in one thread
+        // we rely on this executor being single-threaded as we need to 
restart and mutate the environment from one thread only
         _environmentJobExecutor = Executors.newSingleThreadExecutor(new 
DaemonThreadFactory("Environment-" + _prettyGroupNodeName));
         _stateChangeExecutor = 
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new 
DaemonThreadFactory("StateChange-" + _prettyGroupNodeName)));
-        _groupChangeExecutor = new ScheduledThreadPoolExecutor(1, new 
DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
+        _groupChangeExecutor = new ScheduledThreadPoolExecutor(2, new 
DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
 
         // create environment in a separate thread to avoid renaming of the 
current thread by JE
         EnvHomeRegistry.getInstance().registerHome(_environmentDirectory);
@@ -265,10 +272,14 @@ public class ReplicatedEnvironmentFacade
                 public void run()
                 {
                     populateExistingRemoteReplicationNodes();
-                    int groupSize = _remoteReplicationNodes.size() + 1;
-                    if (groupSize > 0)
+                    int numberOfRemoteNodes = _remoteReplicationNodes.size();
+                    if (numberOfRemoteNodes > 0)
                     {
-                        _groupChangeExecutor.setCorePoolSize(groupSize + 1);
+                        int newPoolSize = numberOfRemoteNodes
+                                          + 1 /* for this node */
+                                          + 1 /* for coordination */;
+                        _groupChangeExecutor.setCorePoolSize(newPoolSize);
+                        LOGGER.debug("Setting group change executor core pool 
size to {}", newPoolSize);
                     }
                     _groupChangeExecutor.submit(new RemoteNodeStateLearner());
                 }
@@ -1230,7 +1241,9 @@ public class ReplicatedEnvironmentFacade
 
     public Future<Void> transferMasterAsynchronously(final String nodeName)
     {
-        // TODO: Should this be executed in the EnvironmentJobExecutor?
+        // Transfer master contacts the group (using GroupAdmin) to request 
the mastership change.
+        // It needs to be done asynchronously but not on the 
_environmentJobExecutor, as there is
+        // no point delaying transfer master because we are restarting.
         return _groupChangeExecutor.submit(new Callable<Void>()
         {
             @Override
@@ -2065,13 +2078,13 @@ public class ReplicatedEnvironmentFacade
             if (env != null)
             {
                 ReplicationGroup group = env.getGroup();
-                Set<ReplicationNode> nodes = new 
HashSet<ReplicationNode>(group.getNodes());
+                Set<ReplicationNode> nodes = new HashSet<>(group.getNodes());
                 String localNodeName = getNodeName();
 
-                int knownRemoteNodeNumber = _remoteReplicationNodes.size();
+                int numberOfKnownRemoteNodes = _remoteReplicationNodes.size();
                 int groupSize = nodes.size();
 
-                Map<String, ReplicationNode> removalMap = new HashMap<String, 
ReplicationNode>(_remoteReplicationNodes);
+                Map<String, ReplicationNode> removalMap = new 
HashMap<>(_remoteReplicationNodes);
                 for (ReplicationNode replicationNode : nodes)
                 {
                     String discoveredNodeName = replicationNode.getName();
@@ -2125,11 +2138,11 @@ public class ReplicatedEnvironmentFacade
                     }
                 }
 
-                if (shouldContinue && knownRemoteNodeNumber + 1 != groupSize)
+                if (shouldContinue && numberOfKnownRemoteNodes + 1 != 
groupSize)
                 {
-                    int poolSize = groupSize + 1;
-                    LOGGER.debug("Setting group change executor core pool size 
to {}", poolSize);
-                    _groupChangeExecutor.setCorePoolSize(poolSize);
+                    int newPoolSize = groupSize + 1;
+                    LOGGER.debug("Setting group change executor core pool size 
to {}", newPoolSize);
+                    _groupChangeExecutor.setCorePoolSize(newPoolSize);
                 }
             }
             return shouldContinue;



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to