Updated Branches: refs/heads/master 23ddf2953 -> 66185076d
Addded back Agent Load Balancing functionality (was temporarely disabled in master by vmSync merge) Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/66185076 Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/66185076 Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/66185076 Branch: refs/heads/master Commit: 66185076df30d1180d402b678c612c223ddee832 Parents: 23ddf29 Author: Alena Prokharchyk <[email protected]> Authored: Mon Oct 7 09:46:03 2013 -0700 Committer: Alena Prokharchyk <[email protected]> Committed: Mon Oct 7 09:46:52 2013 -0700 ---------------------------------------------------------------------- .../manager/ClusteredAgentManagerImpl.java | 103 ++++++++++--------- 1 file changed, 54 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/66185076/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java ---------------------------------------------------------------------- diff --git a/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java index 48f096a..2b9e541 100755 --- a/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java +++ b/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java @@ -43,16 +43,13 @@ import javax.naming.ConfigurationException; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; -import org.apache.log4j.Logger; - -import com.google.gson.Gson; - import org.apache.cloudstack.framework.config.ConfigDepot; import org.apache.cloudstack.framework.config.ConfigKey; import org.apache.cloudstack.framework.config.dao.ConfigurationDao; import org.apache.cloudstack.managed.context.ManagedContextRunnable; import org.apache.cloudstack.managed.context.ManagedContextTimerTask; import org.apache.cloudstack.utils.identity.ManagementServerNode; +import org.apache.log4j.Logger; import com.cloud.agent.AgentManager; import com.cloud.agent.api.Answer; @@ -86,7 +83,6 @@ import com.cloud.host.Status.Event; import com.cloud.resource.ServerResource; import com.cloud.serializer.GsonHelper; import com.cloud.utils.DateUtil; -import com.cloud.utils.Profiler; import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.db.QueryBuilder; import com.cloud.utils.db.SearchCriteria.Op; @@ -94,36 +90,35 @@ import com.cloud.utils.db.Transaction; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.nio.Link; import com.cloud.utils.nio.Task; +import com.google.gson.Gson; @Local(value = { AgentManager.class, ClusteredAgentRebalanceService.class }) public class ClusteredAgentManagerImpl extends AgentManagerImpl implements ClusterManagerListener, ClusteredAgentRebalanceService { final static Logger s_logger = Logger.getLogger(ClusteredAgentManagerImpl.class); - private static final ScheduledExecutorService s_transferExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-AgentTransferExecutor")); + private static final ScheduledExecutorService s_transferExecutor = Executors.newScheduledThreadPool(2, new NamedThreadFactory("Cluster-AgentRebalancingExecutor")); private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list public final static long STARTUP_DELAY = 5000; public final static long SCAN_INTERVAL = 90000; // 90 seconds, it takes 60 sec for xenserver to fail login public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds protected Set<Long> _agentToTransferIds = new HashSet<Long>(); - Gson _gson; - - @Inject - protected ClusterManager _clusterMgr = null; - protected HashMap<String, SocketChannel> _peers; protected HashMap<String, SSLEngine> _sslEngines; private final Timer _timer = new Timer("ClusteredAgentManager Timer"); - + private final Timer _agentLbTimer = new Timer("ClusteredAgentManager AgentRebalancing Timer"); + boolean _agentLbHappened = false; + + @Inject + protected ClusterManager _clusterMgr = null; @Inject protected ManagementServerHostDao _mshostDao; @Inject protected HostTransferMapDao _hostTransferDao; - - // @com.cloud.utils.component.Inject(adapter = AgentLoadBalancerPlanner.class) - @Inject protected List<AgentLoadBalancerPlanner> _lbPlanners; - - @Inject ConfigurationDao _configDao; + @Inject + protected List<AgentLoadBalancerPlanner> _lbPlanners; + @Inject + ConfigurationDao _configDao; @Inject ConfigDepot _configDepot; @@ -168,9 +163,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust if (s_logger.isDebugEnabled()) { s_logger.debug("Scheduled direct agent scan task to run at an interval of " + ScanInterval.value() + " seconds"); } - - // schedule transfer scan executor - if agent LB is enabled + + // Schedule tasks for agent rebalancing if (isAgentRebalanceEnabled()) { + s_transferExecutor.scheduleAtFixedRate(getAgentRebalanceScanTask(), 60000, 60000, TimeUnit.MILLISECONDS); s_transferExecutor.scheduleAtFixedRate(getTransferScanTask(), 60000, ClusteredAgentRebalanceService.DEFAULT_TRANSFER_CHECK_INTERVAL, TimeUnit.MILLISECONDS); } @@ -571,6 +567,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } } _timer.cancel(); + _agentLbTimer.cancel(); //cancel all transfer tasks s_transferExecutor.shutdownNow(); @@ -1354,44 +1351,52 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } public boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException { - return _rebalanceService.executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event); + return executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event); } public boolean isAgentRebalanceEnabled() { return EnableLB.value(); } - - private ClusteredAgentRebalanceService _rebalanceService; - - boolean _agentLbHappened = false; - public void agentrebalance() { - Profiler profilerAgentLB = new Profiler(); - profilerAgentLB.start(); - //initiate agent lb task will be scheduled and executed only once, and only when number of agents loaded exceeds _connectedAgentsThreshold - if (EnableLB.value() && !_agentLbHappened) { - QueryBuilder<HostVO> sc = QueryBuilder.create(HostVO.class); - sc.and(sc.entity().getManagementServerId(), Op.NNULL); - sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing); - List<HostVO> allManagedRoutingAgents = sc.list(); - - sc = QueryBuilder.create(HostVO.class); - sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing); - List<HostVO> allAgents = sc.list(); - double allHostsCount = allAgents.size(); - double managedHostsCount = allManagedRoutingAgents.size(); - if (allHostsCount > 0.0) { - double load = managedHostsCount / allHostsCount; - if (load >= ConnectedAgentThreshold.value()) { - s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " + ConnectedAgentThreshold.value()); - _rebalanceService.scheduleRebalanceAgents(); - _agentLbHappened = true; - } else { - s_logger.trace("Not scheduling agent rebalancing task as the averages load " + load + " is less than the threshold " + ConnectedAgentThreshold.value()); + + + private Runnable getAgentRebalanceScanTask() { + return new ManagedContextRunnable() { + @Override + protected void runInContext() { + try { + if (s_logger.isTraceEnabled()) { + s_logger.trace("Agent rebalance task check, management server id:" + _nodeId); } + //initiate agent lb task will be scheduled and executed only once, and only when number of agents loaded exceeds _connectedAgentsThreshold + if (!_agentLbHappened) { + QueryBuilder<HostVO> sc = QueryBuilder.create(HostVO.class); + sc.and(sc.entity().getManagementServerId(), Op.NNULL); + sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing); + List<HostVO> allManagedRoutingAgents = sc.list(); + + sc = QueryBuilder.create(HostVO.class); + sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing); + List<HostVO> allAgents = sc.list(); + double allHostsCount = allAgents.size(); + double managedHostsCount = allManagedRoutingAgents.size(); + if (allHostsCount > 0.0) { + double load = managedHostsCount / allHostsCount; + if (load >= ConnectedAgentThreshold.value()) { + s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " + ConnectedAgentThreshold.value()); + scheduleRebalanceAgents(); + _agentLbHappened = true; + } else { + s_logger.debug("Not scheduling agent rebalancing task as the averages load " + load + " is less than the threshold " + ConnectedAgentThreshold.value()); + } + } + } + } catch (Throwable e) { + s_logger.error("Problem with the clustered agent transfer scan check!", e); } } - profilerAgentLB.stop(); - } + }; +} + @Override public void rescan() {
