This is an automated email from the ASF dual-hosted git repository. sureshanaparti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudstack.git
The following commit(s) were added to refs/heads/main by this push: new 4c3f29de1e1 Agent manager connection handling improvements (#11376) 4c3f29de1e1 is described below commit 4c3f29de1e145e25df8b287cd53dad4f79188551 Author: Suresh Kumar Anaparti <sureshkumar.anapa...@gmail.com> AuthorDate: Tue Aug 5 15:07:02 2025 +0530 Agent manager connection handling improvements (#11376) * Agent manager connection handling improvements * Fix to send LB check interval in ready command --- agent/src/main/java/com/cloud/agent/Agent.java | 2 +- .../java/com/cloud/agent/api/ReadyCommand.java | 10 +- .../com/cloud/agent/manager/AgentManagerImpl.java | 153 ++++++++++++--------- .../cloud/agent/manager/ConnectedAgentAttache.java | 4 +- .../cloudstack/agent/lb/IndirectAgentLB.java | 7 +- .../agent/lb/IndirectAgentLBServiceImpl.java | 2 +- 6 files changed, 103 insertions(+), 75 deletions(-) diff --git a/agent/src/main/java/com/cloud/agent/Agent.java b/agent/src/main/java/com/cloud/agent/Agent.java index f8ad3350dc9..e86557e9681 100644 --- a/agent/src/main/java/com/cloud/agent/Agent.java +++ b/agent/src/main/java/com/cloud/agent/Agent.java @@ -475,7 +475,7 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater return; } - logger.info("Scheduling a recurring preferred host checker task with lb algorithm '{}' and host.lb.interval={} ms", lbAlgorithm, checkInterval); + logger.info("Scheduling a recurring preferred host checker task with host.lb.interval={} ms", checkInterval); hostLbCheckExecutor = Executors.newSingleThreadScheduledExecutor((new NamedThreadFactory(name))); hostLbCheckExecutor.scheduleAtFixedRate(new PreferredHostCheckerTask(), checkInterval, checkInterval, TimeUnit.MILLISECONDS); diff --git a/core/src/main/java/com/cloud/agent/api/ReadyCommand.java b/core/src/main/java/com/cloud/agent/api/ReadyCommand.java index 49768297ad5..ee61fee66c6 100644 --- a/core/src/main/java/com/cloud/agent/api/ReadyCommand.java +++ b/core/src/main/java/com/cloud/agent/api/ReadyCommand.java @@ -26,10 +26,6 @@ import java.util.List; public class ReadyCommand extends Command { private String _details; - public ReadyCommand() { - super(); - } - private Long dcId; private Long hostId; private String hostUuid; @@ -41,6 +37,10 @@ public class ReadyCommand extends Command { private Boolean enableHumanReadableSizes; private String arch; + public ReadyCommand() { + super(); + } + public ReadyCommand(Long dcId) { super(); this.dcId = dcId; @@ -95,7 +95,7 @@ public class ReadyCommand extends Command { return avoidMsHostList; } - public void setAvoidMsHostList(List<String> msHostList) { + public void setAvoidMsHostList(List<String> avoidMsHostList) { this.avoidMsHostList = avoidMsHostList; } diff --git a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java index 75e9fb20e5a..5e10312741f 100644 --- a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java +++ b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java @@ -27,6 +27,7 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -758,15 +759,15 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl } } - protected AgentAttache notifyMonitorsOfConnection(final AgentAttache attache, final StartupCommand[] cmd, final boolean forRebalance) throws ConnectionException { + protected AgentAttache notifyMonitorsOfConnection(final AgentAttache attache, final StartupCommand[] cmds, final boolean forRebalance) throws ConnectionException { final long hostId = attache.getId(); final HostVO host = _hostDao.findById(hostId); for (final Pair<Integer, Listener> monitor : _hostMonitors) { logger.debug("Sending Connect to listener: {}, for rebalance: {}", monitor.second().getClass().getSimpleName(), forRebalance); - for (int i = 0; i < cmd.length; i++) { + for (StartupCommand cmd : cmds) { try { - logger.debug("process connection to issue: {} for host: {}, forRebalance: {}, connection transferred: {}", ReflectionToStringBuilderUtils.reflectCollection(cmd[i]), hostId, forRebalance, cmd[i].isConnectionTransferred()); - monitor.second().processConnect(host, cmd[i], forRebalance); + logger.debug("process connection to issue: {} for host: {}, forRebalance: {}", ReflectionToStringBuilderUtils.reflectOnlySelectedFields(cmd, "id", "type", "msHostList", "connectionTransferred"), hostId, forRebalance); + monitor.second().processConnect(host, cmd, forRebalance); } catch (final ConnectionException ce) { if (ce.isSetupError()) { logger.warn("Monitor {} says there is an error in the connect process for {} due to {}", monitor.second().getClass().getSimpleName(), hostId, ce.getMessage()); @@ -1040,39 +1041,50 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl protected boolean handleDisconnectWithoutInvestigation(final AgentAttache attache, final Status.Event event, final boolean transitState, final boolean removeAgent) { final long hostId = attache.getId(); - + final HostVO host = _hostDao.findById(hostId); boolean result = false; GlobalLock joinLock = getHostJoinLock(hostId); - if (joinLock.lock(60)) { - try { - logger.info("Host {} is disconnecting with event {}", - attache, event); - Status nextStatus; - final HostVO host = _hostDao.findById(hostId); - if (host == null) { - logger.warn("Can't find host with {} ({})", hostId, attache); - nextStatus = Status.Removed; - } else { - nextStatus = getNextStatusOnDisconnection(host, event); - caService.purgeHostCertificate(host); - } - logger.debug("Deregistering link for {} with state {}", attache, nextStatus); - - removeAgent(attache, nextStatus); - - if (host != null && transitState) { - // update the state for host in DB as per the event - disconnectAgent(host, event, _nodeId); - } - } finally { - joinLock.unlock(); + try { + if (!joinLock.lock(60)) { + logger.debug("Unable to acquire lock on host {} to process agent disconnection", Objects.toString(host, String.valueOf(hostId))); + return result; } + + logger.debug("Acquired lock on host {}, to process agent disconnection", Objects.toString(host, String.valueOf(hostId))); + disconnectHostAgent(attache, event, host, transitState, joinLock); result = true; + } finally { + joinLock.releaseRef(); } - joinLock.releaseRef(); + return result; } + private void disconnectHostAgent(final AgentAttache attache, final Status.Event event, final HostVO host, final boolean transitState, final GlobalLock joinLock) { + try { + logger.info("Host {} is disconnecting with event {}", attache, event); + final long hostId = attache.getId(); + Status nextStatus; + if (host == null) { + logger.warn("Can't find host with {} ({})", hostId, attache); + nextStatus = Status.Removed; + } else { + nextStatus = getNextStatusOnDisconnection(host, event); + caService.purgeHostCertificate(host); + } + logger.debug("Deregistering link for {} with state {}", attache, nextStatus); + + removeAgent(attache, nextStatus); + + if (host != null && transitState) { + // update the state for host in DB as per the event + disconnectAgent(host, event, _nodeId); + } + } finally { + joinLock.unlock(); + } + } + protected boolean handleDisconnectWithInvestigation(final AgentAttache attache, Status.Event event) { final long hostId = attache.getId(); HostVO host = _hostDao.findById(hostId); @@ -1341,45 +1353,58 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl return attache; } - private AgentAttache sendReadyAndGetAttache(HostVO host, ReadyCommand ready, Link link, StartupCommand[] startup) throws ConnectionException { - final List<String> agentMSHostList = new ArrayList<>(); - String lbAlgorithm = null; - if (startup != null && startup.length > 0) { - final String agentMSHosts = startup[0].getMsHostList(); - if (StringUtils.isNotEmpty(agentMSHosts)) { - String[] msHosts = agentMSHosts.split("@"); - if (msHosts.length > 1) { - lbAlgorithm = msHosts[1]; - } - agentMSHostList.addAll(Arrays.asList(msHosts[0].split(","))); - } - } - ready.setArch(host.getArch().getType()); + private AgentAttache sendReadyAndGetAttache(HostVO host, ReadyCommand ready, Link link, StartupCommand[] startupCmds) throws ConnectionException { AgentAttache attache; GlobalLock joinLock = getHostJoinLock(host.getId()); - if (joinLock.lock(60)) { - try { + try { + if (!joinLock.lock(60)) { + throw new ConnectionException(true, String.format("Unable to acquire lock on host %s, to process agent connection", host)); + } + + logger.debug("Acquired lock on host {}, to process agent connection", host); + attache = connectHostAgent(host, ready, link, startupCmds, joinLock); + } finally { + joinLock.releaseRef(); + } - if (!indirectAgentLB.compareManagementServerList(host.getId(), host.getDataCenterId(), agentMSHostList, lbAlgorithm)) { - final List<String> newMSList = indirectAgentLB.getManagementServerList(host.getId(), host.getDataCenterId(), null); - ready.setMsHostList(newMSList); - final List<String> avoidMsList = _mshostDao.listNonUpStateMsIPs(); - ready.setAvoidMsHostList(avoidMsList); - ready.setLbAlgorithm(indirectAgentLB.getLBAlgorithmName()); - ready.setLbCheckInterval(indirectAgentLB.getLBPreferredHostCheckInterval(host.getClusterId())); - logger.debug("Agent's management server host list is not up to date, sending list update: {}", newMSList); + return attache; + } + + private AgentAttache connectHostAgent(HostVO host, ReadyCommand ready, Link link, StartupCommand[] startupCmds, GlobalLock joinLock) throws ConnectionException { + AgentAttache attache; + try { + final List<String> agentMSHostList = new ArrayList<>(); + String lbAlgorithm = null; + if (startupCmds != null && startupCmds.length > 0) { + final String agentMSHosts = startupCmds[0].getMsHostList(); + if (StringUtils.isNotEmpty(agentMSHosts)) { + String[] msHosts = agentMSHosts.split("@"); + if (msHosts.length > 1) { + lbAlgorithm = msHosts[1]; + } + agentMSHostList.addAll(Arrays.asList(msHosts[0].split(","))); } + } - attache = createAttacheForConnect(host, link); - attache = notifyMonitorsOfConnection(attache, startup, false); - } finally { - joinLock.unlock(); + if (!indirectAgentLB.compareManagementServerListAndLBAlgorithm(host.getId(), host.getDataCenterId(), agentMSHostList, lbAlgorithm)) { + final List<String> newMSList = indirectAgentLB.getManagementServerList(host.getId(), host.getDataCenterId(), null); + ready.setMsHostList(newMSList); + String newLBAlgorithm = indirectAgentLB.getLBAlgorithmName(); + ready.setLbAlgorithm(newLBAlgorithm); + logger.debug("Agent's management server host list or lb algorithm is not up to date, sending list and algorithm update: {}, {}", newMSList, newLBAlgorithm); } - } else { - throw new ConnectionException(true, - String.format("Unable to acquire lock on host %s", host)); + + final List<String> avoidMsList = _mshostDao.listNonUpStateMsIPs(); + ready.setAvoidMsHostList(avoidMsList); + ready.setLbCheckInterval(indirectAgentLB.getLBPreferredHostCheckInterval(host.getClusterId())); + ready.setArch(host.getArch().getType()); + + attache = createAttacheForConnect(host, link); + attache = notifyMonitorsOfConnection(attache, startupCmds, false); + } finally { + joinLock.unlock(); } - joinLock.releaseRef(); + return attache; } @@ -1666,7 +1691,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl logger.debug("Not processing {} for agent id={}; can't find the host in the DB", PingRoutingCommand.class.getSimpleName(), cmdHostId); } } - if (host!= null && host.getStatus() != Status.Up && gatewayAccessible) { + if (host != null && host.getStatus() != Status.Up && gatewayAccessible) { requestStartupCommand = true; } final List<String> avoidMsList = _mshostDao.listNonUpStateMsIPs(); @@ -1821,11 +1846,11 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl return false; } - private void disconnectInternal(final long hostId, final Status.Event event, final boolean invstigate) { + private void disconnectInternal(final long hostId, final Status.Event event, final boolean investigate) { final AgentAttache attache = findAttache(hostId); if (attache != null) { - if (!invstigate) { + if (!investigate) { disconnectWithoutInvestigation(attache, event); } else { disconnectWithInvestigation(attache, event); diff --git a/engine/orchestration/src/main/java/com/cloud/agent/manager/ConnectedAgentAttache.java b/engine/orchestration/src/main/java/com/cloud/agent/manager/ConnectedAgentAttache.java index f208a81b422..f4efaaa34a4 100644 --- a/engine/orchestration/src/main/java/com/cloud/agent/manager/ConnectedAgentAttache.java +++ b/engine/orchestration/src/main/java/com/cloud/agent/manager/ConnectedAgentAttache.java @@ -54,8 +54,10 @@ public class ConnectedAgentAttache extends AgentAttache { @Override public void disconnect(final Status state) { synchronized (this) { - logger.debug("Processing Disconnect."); + logger.debug("Processing disconnect [id: {}, uuid: {}, name: {}]", _id, _uuid, _name); + if (_link != null) { + logger.debug("Disconnecting from {}, Socket Address: {}", _link.getIpAddress(), _link.getSocketAddress()); _link.close(); _link.terminated(); } diff --git a/framework/agent-lb/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLB.java b/framework/agent-lb/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLB.java index 780a09b883e..7c7e2605e74 100644 --- a/framework/agent-lb/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLB.java +++ b/framework/agent-lb/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLB.java @@ -48,13 +48,14 @@ public interface IndirectAgentLB { List<String> getManagementServerList(Long hostId, Long dcId, List<Long> orderedHostIdList, String lbAlgorithm); /** - * Compares received management server list against expected list for a host in a zone. + * Compares received management server list against expected list for a host in a zone and LB algorithm. * @param hostId host id * @param dcId zone id * @param receivedMSHosts received management server list - * @return true if mgmtHosts is up to date, false if not + * @param lbAlgorithm received LB algorithm + * @return true if mgmtHosts and LB algorithm are up to date, false if not */ - boolean compareManagementServerList(Long hostId, Long dcId, List<String> receivedMSHosts, String lbAlgorithm); + boolean compareManagementServerListAndLBAlgorithm(Long hostId, Long dcId, List<String> receivedMSHosts, String lbAlgorithm); /** * Returns the configure LB algorithm diff --git a/server/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImpl.java b/server/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImpl.java index fc893a7ef50..dc7b6282b08 100644 --- a/server/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImpl.java +++ b/server/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImpl.java @@ -149,7 +149,7 @@ public class IndirectAgentLBServiceImpl extends ComponentLifecycleBase implement } @Override - public boolean compareManagementServerList(final Long hostId, final Long dcId, final List<String> receivedMSHosts, final String lbAlgorithm) { + public boolean compareManagementServerListAndLBAlgorithm(final Long hostId, final Long dcId, final List<String> receivedMSHosts, final String lbAlgorithm) { if (receivedMSHosts == null || receivedMSHosts.isEmpty()) { return false; }