Author: orudyy
Date: Thu Feb  4 22:35:12 2016
New Revision: 1728561

URL: http://svn.apache.org/viewvc?rev=1728561&view=rev
Log:
QPID-7024: Set the number of threads in 'group change learner' thread pool to 
'group size' + 1

           merged from trunk
           svn merge -c 1728167,1728302 
https://svn.apache.org/repos/asf/qpid/java/trunk

Modified:
    qpid/java/branches/6.0.x/   (props changed)
    
qpid/java/branches/6.0.x/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java

Propchange: qpid/java/branches/6.0.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Feb  4 22:35:12 2016
@@ -9,5 +9,5 @@
 /qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
 /qpid/branches/java-network-refactor/qpid/java:805429-821809
 /qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1728089
+/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1728089,1728167,1728302
 /qpid/trunk/qpid:796646-796653

Modified: 
qpid/java/branches/6.0.x/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1728561&r1=1728560&r2=1728561&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
 (original)
+++ 
qpid/java/branches/6.0.x/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
 Thu Feb  4 22:35:12 2016
@@ -40,7 +40,7 @@ import java.util.concurrent.ExecutionExc
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -202,7 +202,14 @@ public class ReplicatedEnvironmentFacade
 
     private final ExecutorService _environmentJobExecutor;
     private final ListeningExecutorService _stateChangeExecutor;
-    private final ScheduledExecutorService _groupChangeExecutor;
+
+    /**
+     * Executor used to learn about changes in the group.  Number of threads 
in the pool is maintained dynammically
+     * to be number of nodes in the group + 1.   This gives us sufficient 
threads to 'ping' all the remote nodes in the
+     * group (in parallel), and a thread for the coordination of the pings.  
We also use the executor for the
+     * transfer master operation.
+     */
+    private final ScheduledThreadPoolExecutor _groupChangeExecutor;
     private final AtomicReference<State> _state = new 
AtomicReference<State>(State.OPENING);
     private final ConcurrentMap<String, ReplicationNode> 
_remoteReplicationNodes = new ConcurrentHashMap<String, ReplicationNode>();
     private final AtomicReference<ReplicationGroupListener> 
_replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
@@ -250,10 +257,10 @@ public class ReplicatedEnvironmentFacade
         _defaultDurability = new 
Durability(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, 
REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, 
REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY);
         _prettyGroupNodeName = _configuration.getGroupName() + ":" + 
_configuration.getName();
 
-        // we relay on this executor being single-threaded as we need to 
restart and mutate the environment in one thread
+        // we rely on this executor being single-threaded as we need to 
restart and mutate the environment from one thread only
         _environmentJobExecutor = Executors.newSingleThreadExecutor(new 
DaemonThreadFactory("Environment-" + _prettyGroupNodeName));
         _stateChangeExecutor = 
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new 
DaemonThreadFactory("StateChange-" + _prettyGroupNodeName)));
-        _groupChangeExecutor = 
Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 
1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
+        _groupChangeExecutor = new ScheduledThreadPoolExecutor(2, new 
DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
 
         // create environment in a separate thread to avoid renaming of the 
current thread by JE
         EnvHomeRegistry.getInstance().registerHome(_environmentDirectory);
@@ -265,6 +272,15 @@ public class ReplicatedEnvironmentFacade
                 public void run()
                 {
                     populateExistingRemoteReplicationNodes();
+                    int numberOfRemoteNodes = _remoteReplicationNodes.size();
+                    if (numberOfRemoteNodes > 0)
+                    {
+                        int newPoolSize = numberOfRemoteNodes
+                                          + 1 /* for this node */
+                                          + 1 /* for coordination */;
+                        _groupChangeExecutor.setCorePoolSize(newPoolSize);
+                        LOGGER.debug("Setting group change executor core pool 
size to {}", newPoolSize);
+                    }
                     _groupChangeExecutor.submit(new RemoteNodeStateLearner());
                 }
             });
@@ -1225,7 +1241,9 @@ public class ReplicatedEnvironmentFacade
 
     public Future<Void> transferMasterAsynchronously(final String nodeName)
     {
-        // TODO: Should this be executed in the EnvironmentJobExecutor?
+        // Transfer master contacts the group (using GroupAdmin) to request 
the mastership change.
+        // It needs to be done asynchronously but not on the 
_environmentJobExecutor, as there is
+        // no point delaying transfer master because we are restarting.
         return _groupChangeExecutor.submit(new Callable<Void>()
         {
             @Override
@@ -2060,10 +2078,13 @@ public class ReplicatedEnvironmentFacade
             if (env != null)
             {
                 ReplicationGroup group = env.getGroup();
-                Set<ReplicationNode> nodes = new 
HashSet<ReplicationNode>(group.getNodes());
+                Set<ReplicationNode> nodes = new HashSet<>(group.getNodes());
                 String localNodeName = getNodeName();
 
-                Map<String, ReplicationNode> removalMap = new HashMap<String, 
ReplicationNode>(_remoteReplicationNodes);
+                int numberOfKnownRemoteNodes = _remoteReplicationNodes.size();
+                int groupSize = nodes.size();
+
+                Map<String, ReplicationNode> removalMap = new 
HashMap<>(_remoteReplicationNodes);
                 for (ReplicationNode replicationNode : nodes)
                 {
                     String discoveredNodeName = replicationNode.getName();
@@ -2116,6 +2137,13 @@ public class ReplicatedEnvironmentFacade
                         }
                     }
                 }
+
+                if (shouldContinue && numberOfKnownRemoteNodes + 1 != 
groupSize)
+                {
+                    int newPoolSize = groupSize + 1;
+                    LOGGER.debug("Setting group change executor core pool size 
to {}", newPoolSize);
+                    _groupChangeExecutor.setCorePoolSize(newPoolSize);
+                }
             }
             return shouldContinue;
         }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to