Author: orudyy
Date: Tue Feb 2 18:09:57 2016
New Revision: 1728167
URL: http://svn.apache.org/viewvc?rev=1728167&view=rev
Log:
QPID-7024: Set the number of threads in 'group change learner' thread pool to
'group size' + 1
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1728167&r1=1728166&r2=1728167&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
(original)
+++
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
Tue Feb 2 18:09:57 2016
@@ -40,7 +40,7 @@ import java.util.concurrent.ExecutionExc
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
@@ -202,7 +202,7 @@ public class ReplicatedEnvironmentFacade
private final ExecutorService _environmentJobExecutor;
private final ListeningExecutorService _stateChangeExecutor;
- private final ScheduledExecutorService _groupChangeExecutor;
+ private final ScheduledThreadPoolExecutor _groupChangeExecutor;
private final AtomicReference<State> _state = new
AtomicReference<State>(State.OPENING);
private final ConcurrentMap<String, ReplicationNode>
_remoteReplicationNodes = new ConcurrentHashMap<String, ReplicationNode>();
private final AtomicReference<ReplicationGroupListener>
_replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
@@ -253,7 +253,7 @@ public class ReplicatedEnvironmentFacade
// we relay on this executor being single-threaded as we need to
restart and mutate the environment in one thread
_environmentJobExecutor = Executors.newSingleThreadExecutor(new
DaemonThreadFactory("Environment-" + _prettyGroupNodeName));
_stateChangeExecutor =
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new
DaemonThreadFactory("StateChange-" + _prettyGroupNodeName)));
- _groupChangeExecutor =
Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() +
1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
+ _groupChangeExecutor = new ScheduledThreadPoolExecutor(1, new
DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
// create environment in a separate thread to avoid renaming of the
current thread by JE
EnvHomeRegistry.getInstance().registerHome(_environmentDirectory);
@@ -265,6 +265,11 @@ public class ReplicatedEnvironmentFacade
public void run()
{
populateExistingRemoteReplicationNodes();
+ int groupSize = _remoteReplicationNodes.size() + 1;
+ if (groupSize > 0)
+ {
+ _groupChangeExecutor.setCorePoolSize(groupSize + 1);
+ }
_groupChangeExecutor.submit(new RemoteNodeStateLearner());
}
});
@@ -2063,6 +2068,9 @@ public class ReplicatedEnvironmentFacade
Set<ReplicationNode> nodes = new
HashSet<ReplicationNode>(group.getNodes());
String localNodeName = getNodeName();
+ int knownRemoteNodeNumber = _remoteReplicationNodes.size();
+ int groupSize = nodes.size();
+
Map<String, ReplicationNode> removalMap = new HashMap<String,
ReplicationNode>(_remoteReplicationNodes);
for (ReplicationNode replicationNode : nodes)
{
@@ -2116,6 +2124,13 @@ public class ReplicatedEnvironmentFacade
}
}
}
+
+ if (shouldContinue && knownRemoteNodeNumber + 1 != groupSize)
+ {
+ int poolSize = groupSize + 1;
+ LOGGER.debug("Setting group change executor core pool size
to {}", poolSize);
+ _groupChangeExecutor.setCorePoolSize(poolSize);
+ }
}
return shouldContinue;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]