This is an automated email from the ASF dual-hosted git repository.
sureshanaparti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudstack.git
The following commit(s) were added to refs/heads/main by this push:
new 4c3f29de1e1 Agent manager connection handling improvements (#11376)
4c3f29de1e1 is described below
commit 4c3f29de1e145e25df8b287cd53dad4f79188551
Author: Suresh Kumar Anaparti <[email protected]>
AuthorDate: Tue Aug 5 15:07:02 2025 +0530
Agent manager connection handling improvements (#11376)
* Agent manager connection handling improvements
* Fix to send LB check interval in ready command
---
agent/src/main/java/com/cloud/agent/Agent.java | 2 +-
.../java/com/cloud/agent/api/ReadyCommand.java | 10 +-
.../com/cloud/agent/manager/AgentManagerImpl.java | 153 ++++++++++++---------
.../cloud/agent/manager/ConnectedAgentAttache.java | 4 +-
.../cloudstack/agent/lb/IndirectAgentLB.java | 7 +-
.../agent/lb/IndirectAgentLBServiceImpl.java | 2 +-
6 files changed, 103 insertions(+), 75 deletions(-)
diff --git a/agent/src/main/java/com/cloud/agent/Agent.java
b/agent/src/main/java/com/cloud/agent/Agent.java
index f8ad3350dc9..e86557e9681 100644
--- a/agent/src/main/java/com/cloud/agent/Agent.java
+++ b/agent/src/main/java/com/cloud/agent/Agent.java
@@ -475,7 +475,7 @@ public class Agent implements HandlerFactory,
IAgentControl, AgentStatusUpdater
return;
}
- logger.info("Scheduling a recurring preferred host checker task
with lb algorithm '{}' and host.lb.interval={} ms", lbAlgorithm, checkInterval);
+ logger.info("Scheduling a recurring preferred host checker task
with host.lb.interval={} ms", checkInterval);
hostLbCheckExecutor =
Executors.newSingleThreadScheduledExecutor((new NamedThreadFactory(name)));
hostLbCheckExecutor.scheduleAtFixedRate(new
PreferredHostCheckerTask(), checkInterval, checkInterval,
TimeUnit.MILLISECONDS);
diff --git a/core/src/main/java/com/cloud/agent/api/ReadyCommand.java
b/core/src/main/java/com/cloud/agent/api/ReadyCommand.java
index 49768297ad5..ee61fee66c6 100644
--- a/core/src/main/java/com/cloud/agent/api/ReadyCommand.java
+++ b/core/src/main/java/com/cloud/agent/api/ReadyCommand.java
@@ -26,10 +26,6 @@ import java.util.List;
public class ReadyCommand extends Command {
private String _details;
- public ReadyCommand() {
- super();
- }
-
private Long dcId;
private Long hostId;
private String hostUuid;
@@ -41,6 +37,10 @@ public class ReadyCommand extends Command {
private Boolean enableHumanReadableSizes;
private String arch;
+ public ReadyCommand() {
+ super();
+ }
+
public ReadyCommand(Long dcId) {
super();
this.dcId = dcId;
@@ -95,7 +95,7 @@ public class ReadyCommand extends Command {
return avoidMsHostList;
}
- public void setAvoidMsHostList(List<String> msHostList) {
+ public void setAvoidMsHostList(List<String> avoidMsHostList) {
this.avoidMsHostList = avoidMsHostList;
}
diff --git
a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java
b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java
index 75e9fb20e5a..5e10312741f 100644
---
a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java
+++
b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java
@@ -27,6 +27,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -758,15 +759,15 @@ public class AgentManagerImpl extends ManagerBase
implements AgentManager, Handl
}
}
- protected AgentAttache notifyMonitorsOfConnection(final AgentAttache
attache, final StartupCommand[] cmd, final boolean forRebalance) throws
ConnectionException {
+ protected AgentAttache notifyMonitorsOfConnection(final AgentAttache
attache, final StartupCommand[] cmds, final boolean forRebalance) throws
ConnectionException {
final long hostId = attache.getId();
final HostVO host = _hostDao.findById(hostId);
for (final Pair<Integer, Listener> monitor : _hostMonitors) {
logger.debug("Sending Connect to listener: {}, for rebalance: {}",
monitor.second().getClass().getSimpleName(), forRebalance);
- for (int i = 0; i < cmd.length; i++) {
+ for (StartupCommand cmd : cmds) {
try {
- logger.debug("process connection to issue: {} for host:
{}, forRebalance: {}, connection transferred: {}",
ReflectionToStringBuilderUtils.reflectCollection(cmd[i]), hostId, forRebalance,
cmd[i].isConnectionTransferred());
- monitor.second().processConnect(host, cmd[i],
forRebalance);
+ logger.debug("process connection to issue: {} for host:
{}, forRebalance: {}",
ReflectionToStringBuilderUtils.reflectOnlySelectedFields(cmd, "id", "type",
"msHostList", "connectionTransferred"), hostId, forRebalance);
+ monitor.second().processConnect(host, cmd, forRebalance);
} catch (final ConnectionException ce) {
if (ce.isSetupError()) {
logger.warn("Monitor {} says there is an error in the
connect process for {} due to {}", monitor.second().getClass().getSimpleName(),
hostId, ce.getMessage());
@@ -1040,39 +1041,50 @@ public class AgentManagerImpl extends ManagerBase
implements AgentManager, Handl
protected boolean handleDisconnectWithoutInvestigation(final AgentAttache
attache, final Status.Event event, final boolean transitState, final boolean
removeAgent) {
final long hostId = attache.getId();
-
+ final HostVO host = _hostDao.findById(hostId);
boolean result = false;
GlobalLock joinLock = getHostJoinLock(hostId);
- if (joinLock.lock(60)) {
- try {
- logger.info("Host {} is disconnecting with event {}",
- attache, event);
- Status nextStatus;
- final HostVO host = _hostDao.findById(hostId);
- if (host == null) {
- logger.warn("Can't find host with {} ({})", hostId,
attache);
- nextStatus = Status.Removed;
- } else {
- nextStatus = getNextStatusOnDisconnection(host, event);
- caService.purgeHostCertificate(host);
- }
- logger.debug("Deregistering link for {} with state {}",
attache, nextStatus);
-
- removeAgent(attache, nextStatus);
-
- if (host != null && transitState) {
- // update the state for host in DB as per the event
- disconnectAgent(host, event, _nodeId);
- }
- } finally {
- joinLock.unlock();
+ try {
+ if (!joinLock.lock(60)) {
+ logger.debug("Unable to acquire lock on host {} to process
agent disconnection", Objects.toString(host, String.valueOf(hostId)));
+ return result;
}
+
+ logger.debug("Acquired lock on host {}, to process agent
disconnection", Objects.toString(host, String.valueOf(hostId)));
+ disconnectHostAgent(attache, event, host, transitState, joinLock);
result = true;
+ } finally {
+ joinLock.releaseRef();
}
- joinLock.releaseRef();
+
return result;
}
+ private void disconnectHostAgent(final AgentAttache attache, final
Status.Event event, final HostVO host, final boolean transitState, final
GlobalLock joinLock) {
+ try {
+ logger.info("Host {} is disconnecting with event {}", attache,
event);
+ final long hostId = attache.getId();
+ Status nextStatus;
+ if (host == null) {
+ logger.warn("Can't find host with {} ({})", hostId, attache);
+ nextStatus = Status.Removed;
+ } else {
+ nextStatus = getNextStatusOnDisconnection(host, event);
+ caService.purgeHostCertificate(host);
+ }
+ logger.debug("Deregistering link for {} with state {}", attache,
nextStatus);
+
+ removeAgent(attache, nextStatus);
+
+ if (host != null && transitState) {
+ // update the state for host in DB as per the event
+ disconnectAgent(host, event, _nodeId);
+ }
+ } finally {
+ joinLock.unlock();
+ }
+ }
+
protected boolean handleDisconnectWithInvestigation(final AgentAttache
attache, Status.Event event) {
final long hostId = attache.getId();
HostVO host = _hostDao.findById(hostId);
@@ -1341,45 +1353,58 @@ public class AgentManagerImpl extends ManagerBase
implements AgentManager, Handl
return attache;
}
- private AgentAttache sendReadyAndGetAttache(HostVO host, ReadyCommand
ready, Link link, StartupCommand[] startup) throws ConnectionException {
- final List<String> agentMSHostList = new ArrayList<>();
- String lbAlgorithm = null;
- if (startup != null && startup.length > 0) {
- final String agentMSHosts = startup[0].getMsHostList();
- if (StringUtils.isNotEmpty(agentMSHosts)) {
- String[] msHosts = agentMSHosts.split("@");
- if (msHosts.length > 1) {
- lbAlgorithm = msHosts[1];
- }
- agentMSHostList.addAll(Arrays.asList(msHosts[0].split(",")));
- }
- }
- ready.setArch(host.getArch().getType());
+ private AgentAttache sendReadyAndGetAttache(HostVO host, ReadyCommand
ready, Link link, StartupCommand[] startupCmds) throws ConnectionException {
AgentAttache attache;
GlobalLock joinLock = getHostJoinLock(host.getId());
- if (joinLock.lock(60)) {
- try {
+ try {
+ if (!joinLock.lock(60)) {
+ throw new ConnectionException(true, String.format("Unable to
acquire lock on host %s, to process agent connection", host));
+ }
+
+ logger.debug("Acquired lock on host {}, to process agent
connection", host);
+ attache = connectHostAgent(host, ready, link, startupCmds,
joinLock);
+ } finally {
+ joinLock.releaseRef();
+ }
- if (!indirectAgentLB.compareManagementServerList(host.getId(),
host.getDataCenterId(), agentMSHostList, lbAlgorithm)) {
- final List<String> newMSList =
indirectAgentLB.getManagementServerList(host.getId(), host.getDataCenterId(),
null);
- ready.setMsHostList(newMSList);
- final List<String> avoidMsList =
_mshostDao.listNonUpStateMsIPs();
- ready.setAvoidMsHostList(avoidMsList);
- ready.setLbAlgorithm(indirectAgentLB.getLBAlgorithmName());
-
ready.setLbCheckInterval(indirectAgentLB.getLBPreferredHostCheckInterval(host.getClusterId()));
- logger.debug("Agent's management server host list is not
up to date, sending list update: {}", newMSList);
+ return attache;
+ }
+
+ private AgentAttache connectHostAgent(HostVO host, ReadyCommand ready,
Link link, StartupCommand[] startupCmds, GlobalLock joinLock) throws
ConnectionException {
+ AgentAttache attache;
+ try {
+ final List<String> agentMSHostList = new ArrayList<>();
+ String lbAlgorithm = null;
+ if (startupCmds != null && startupCmds.length > 0) {
+ final String agentMSHosts = startupCmds[0].getMsHostList();
+ if (StringUtils.isNotEmpty(agentMSHosts)) {
+ String[] msHosts = agentMSHosts.split("@");
+ if (msHosts.length > 1) {
+ lbAlgorithm = msHosts[1];
+ }
+
agentMSHostList.addAll(Arrays.asList(msHosts[0].split(",")));
}
+ }
- attache = createAttacheForConnect(host, link);
- attache = notifyMonitorsOfConnection(attache, startup, false);
- } finally {
- joinLock.unlock();
+ if
(!indirectAgentLB.compareManagementServerListAndLBAlgorithm(host.getId(),
host.getDataCenterId(), agentMSHostList, lbAlgorithm)) {
+ final List<String> newMSList =
indirectAgentLB.getManagementServerList(host.getId(), host.getDataCenterId(),
null);
+ ready.setMsHostList(newMSList);
+ String newLBAlgorithm = indirectAgentLB.getLBAlgorithmName();
+ ready.setLbAlgorithm(newLBAlgorithm);
+ logger.debug("Agent's management server host list or lb
algorithm is not up to date, sending list and algorithm update: {}, {}",
newMSList, newLBAlgorithm);
}
- } else {
- throw new ConnectionException(true,
- String.format("Unable to acquire lock on host %s", host));
+
+ final List<String> avoidMsList = _mshostDao.listNonUpStateMsIPs();
+ ready.setAvoidMsHostList(avoidMsList);
+
ready.setLbCheckInterval(indirectAgentLB.getLBPreferredHostCheckInterval(host.getClusterId()));
+ ready.setArch(host.getArch().getType());
+
+ attache = createAttacheForConnect(host, link);
+ attache = notifyMonitorsOfConnection(attache, startupCmds, false);
+ } finally {
+ joinLock.unlock();
}
- joinLock.releaseRef();
+
return attache;
}
@@ -1666,7 +1691,7 @@ public class AgentManagerImpl extends ManagerBase
implements AgentManager, Handl
logger.debug("Not processing {} for agent
id={}; can't find the host in the DB",
PingRoutingCommand.class.getSimpleName(), cmdHostId);
}
}
- if (host!= null && host.getStatus() != Status.Up
&& gatewayAccessible) {
+ if (host != null && host.getStatus() != Status.Up
&& gatewayAccessible) {
requestStartupCommand = true;
}
final List<String> avoidMsList =
_mshostDao.listNonUpStateMsIPs();
@@ -1821,11 +1846,11 @@ public class AgentManagerImpl extends ManagerBase
implements AgentManager, Handl
return false;
}
- private void disconnectInternal(final long hostId, final Status.Event
event, final boolean invstigate) {
+ private void disconnectInternal(final long hostId, final Status.Event
event, final boolean investigate) {
final AgentAttache attache = findAttache(hostId);
if (attache != null) {
- if (!invstigate) {
+ if (!investigate) {
disconnectWithoutInvestigation(attache, event);
} else {
disconnectWithInvestigation(attache, event);
diff --git
a/engine/orchestration/src/main/java/com/cloud/agent/manager/ConnectedAgentAttache.java
b/engine/orchestration/src/main/java/com/cloud/agent/manager/ConnectedAgentAttache.java
index f208a81b422..f4efaaa34a4 100644
---
a/engine/orchestration/src/main/java/com/cloud/agent/manager/ConnectedAgentAttache.java
+++
b/engine/orchestration/src/main/java/com/cloud/agent/manager/ConnectedAgentAttache.java
@@ -54,8 +54,10 @@ public class ConnectedAgentAttache extends AgentAttache {
@Override
public void disconnect(final Status state) {
synchronized (this) {
- logger.debug("Processing Disconnect.");
+ logger.debug("Processing disconnect [id: {}, uuid: {}, name: {}]",
_id, _uuid, _name);
+
if (_link != null) {
+ logger.debug("Disconnecting from {}, Socket Address: {}",
_link.getIpAddress(), _link.getSocketAddress());
_link.close();
_link.terminated();
}
diff --git
a/framework/agent-lb/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLB.java
b/framework/agent-lb/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLB.java
index 780a09b883e..7c7e2605e74 100644
---
a/framework/agent-lb/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLB.java
+++
b/framework/agent-lb/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLB.java
@@ -48,13 +48,14 @@ public interface IndirectAgentLB {
List<String> getManagementServerList(Long hostId, Long dcId, List<Long>
orderedHostIdList, String lbAlgorithm);
/**
- * Compares received management server list against expected list for a
host in a zone.
+ * Compares received management server list against expected list for a
host in a zone and LB algorithm.
* @param hostId host id
* @param dcId zone id
* @param receivedMSHosts received management server list
- * @return true if mgmtHosts is up to date, false if not
+ * @param lbAlgorithm received LB algorithm
+ * @return true if mgmtHosts and LB algorithm are up to date, false if not
*/
- boolean compareManagementServerList(Long hostId, Long dcId, List<String>
receivedMSHosts, String lbAlgorithm);
+ boolean compareManagementServerListAndLBAlgorithm(Long hostId, Long dcId,
List<String> receivedMSHosts, String lbAlgorithm);
/**
* Returns the configure LB algorithm
diff --git
a/server/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImpl.java
b/server/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImpl.java
index fc893a7ef50..dc7b6282b08 100644
---
a/server/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImpl.java
+++
b/server/src/main/java/org/apache/cloudstack/agent/lb/IndirectAgentLBServiceImpl.java
@@ -149,7 +149,7 @@ public class IndirectAgentLBServiceImpl extends
ComponentLifecycleBase implement
}
@Override
- public boolean compareManagementServerList(final Long hostId, final Long
dcId, final List<String> receivedMSHosts, final String lbAlgorithm) {
+ public boolean compareManagementServerListAndLBAlgorithm(final Long
hostId, final Long dcId, final List<String> receivedMSHosts, final String
lbAlgorithm) {
if (receivedMSHosts == null || receivedMSHosts.isEmpty()) {
return false;
}