This is an automated email from the ASF dual-hosted git repository.
dahn pushed a commit to branch 4.18
in repository https://gitbox.apache.org/repos/asf/cloudstack.git
The following commit(s) were added to refs/heads/4.18 by this push:
new 3b11663d87e Fix failure on agent reconnection (#8089)
3b11663d87e is described below
commit 3b11663d87e3f3e643d37e1ba1338e9402a2622a
Author: Vishesh <[email protected]>
AuthorDate: Thu Oct 26 20:24:36 2023 +0530
Fix failure on agent reconnection (#8089)
---
agent/src/main/java/com/cloud/agent/Agent.java | 4 +
.../main/java/com/cloud/agent/api/PingAnswer.java | 13 +-
.../com/cloud/agent/manager/AgentManagerImpl.java | 152 +++++++++++++--------
test/integration/smoke/test_host_ping.py | 102 ++++++++++++++
4 files changed, 215 insertions(+), 56 deletions(-)
diff --git a/agent/src/main/java/com/cloud/agent/Agent.java
b/agent/src/main/java/com/cloud/agent/Agent.java
index e9213ca9b8c..e8558931cbf 100644
--- a/agent/src/main/java/com/cloud/agent/Agent.java
+++ b/agent/src/main/java/com/cloud/agent/Agent.java
@@ -40,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.ConfigurationException;
+import com.cloud.agent.api.PingAnswer;
import com.cloud.utils.NumbersUtil;
import org.apache.cloudstack.agent.lb.SetupMSListAnswer;
import org.apache.cloudstack.agent.lb.SetupMSListCommand;
@@ -822,6 +823,9 @@ public class Agent implements HandlerFactory, IAgentControl
{
listener.processControlResponse(response,
(AgentControlAnswer)answer);
}
}
+ } else if (answer instanceof PingAnswer && (((PingAnswer)
answer).isSendStartup()) && _reconnectAllowed) {
+ s_logger.info("Management server requested startup command to
reinitialize the agent");
+ sendStartup(link);
} else {
setLastPingResponseTime();
}
diff --git a/core/src/main/java/com/cloud/agent/api/PingAnswer.java
b/core/src/main/java/com/cloud/agent/api/PingAnswer.java
index 35242380739..6353b121583 100644
--- a/core/src/main/java/com/cloud/agent/api/PingAnswer.java
+++ b/core/src/main/java/com/cloud/agent/api/PingAnswer.java
@@ -22,15 +22,26 @@ package com.cloud.agent.api;
public class PingAnswer extends Answer {
private PingCommand _command = null;
+ private boolean sendStartup = false;
+
protected PingAnswer() {
}
- public PingAnswer(PingCommand cmd) {
+ public PingAnswer(PingCommand cmd, boolean sendStartup) {
super(cmd);
_command = cmd;
+ this.sendStartup = sendStartup;
}
public PingCommand getCommand() {
return _command;
}
+
+ public boolean isSendStartup() {
+ return sendStartup;
+ }
+
+ public void setSendStartup(boolean sendStartup) {
+ this.sendStartup = sendStartup;
+ }
}
diff --git
a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java
b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java
index b74c11cf138..26e871877a3 100644
---
a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java
+++
b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java
@@ -40,6 +40,7 @@ import javax.naming.ConfigurationException;
import com.cloud.configuration.Config;
import com.cloud.utils.NumbersUtil;
+import com.cloud.utils.db.GlobalLock;
import org.apache.cloudstack.agent.lb.IndirectAgentLB;
import org.apache.cloudstack.ca.CAManager;
import
org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
@@ -798,49 +799,65 @@ public class AgentManagerImpl extends ManagerBase
implements AgentManager, Handl
return true;
}
+ protected Status getNextStatusOnDisconnection(Host host, final
Status.Event event) {
+ final Status currentStatus = host.getStatus();
+ Status nextStatus;
+ if (currentStatus == Status.Down || currentStatus == Status.Alert ||
currentStatus == Status.Removed) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug(String.format("Host %s is already %s",
host.getUuid(), currentStatus));
+ }
+ nextStatus = currentStatus;
+ } else {
+ try {
+ nextStatus = currentStatus.getNextStatus(event);
+ } catch (final NoTransitionException e) {
+ final String err = String.format("Cannot find next status for
%s as current status is %s for agent %s", event, currentStatus, host.getUuid());
+ s_logger.debug(err);
+ throw new CloudRuntimeException(err);
+ }
+
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug(String.format("The next status of agent %s is
%s, current status is %s", host.getUuid(), nextStatus, currentStatus));
+ }
+ }
+ return nextStatus;
+ }
+
protected boolean handleDisconnectWithoutInvestigation(final AgentAttache
attache, final Status.Event event, final boolean transitState, final boolean
removeAgent) {
final long hostId = attache.getId();
- s_logger.info("Host " + hostId + " is disconnecting with event " +
event);
- Status nextStatus = null;
- final HostVO host = _hostDao.findById(hostId);
- if (host == null) {
- s_logger.warn("Can't find host with " + hostId);
- nextStatus = Status.Removed;
- } else {
- final Status currentStatus = host.getStatus();
- if (currentStatus == Status.Down || currentStatus == Status.Alert
|| currentStatus == Status.Removed) {
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("Host " + hostId + " is already " +
currentStatus);
- }
- nextStatus = currentStatus;
- } else {
- try {
- nextStatus = currentStatus.getNextStatus(event);
- } catch (final NoTransitionException e) {
- final String err = "Cannot find next status for " + event
+ " as current status is " + currentStatus + " for agent " + hostId;
- s_logger.debug(err);
- throw new CloudRuntimeException(err);
+ boolean result = false;
+ GlobalLock joinLock = getHostJoinLock(hostId);
+ if (joinLock.lock(60)) {
+ try {
+ s_logger.info(String.format("Host %d is disconnecting with
event %s", hostId, event));
+ Status nextStatus = null;
+ final HostVO host = _hostDao.findById(hostId);
+ if (host == null) {
+ s_logger.warn(String.format("Can't find host with %d",
hostId));
+ nextStatus = Status.Removed;
+ } else {
+ nextStatus = getNextStatusOnDisconnection(host, event);
+ caService.purgeHostCertificate(host);
}
if (s_logger.isDebugEnabled()) {
- s_logger.debug("The next status of agent " + hostId + "is
" + nextStatus + ", current status is " + currentStatus);
+ s_logger.debug(String.format("Deregistering link for %d
with state %s", hostId, nextStatus));
}
- }
- caService.purgeHostCertificate(host);
- }
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("Deregistering link for " + hostId + " with state "
+ nextStatus);
- }
+ removeAgent(attache, nextStatus);
- removeAgent(attache, nextStatus);
- // update the DB
- if (host != null && transitState) {
- disconnectAgent(host, event, _nodeId);
+ if (host != null && transitState) {
+ // update the state for host in DB as per the event
+ disconnectAgent(host, event, _nodeId);
+ }
+ } finally {
+ joinLock.unlock();
+ }
+ result = true;
}
-
- return true;
+ joinLock.releaseRef();
+ return result;
}
protected boolean handleDisconnectWithInvestigation(final AgentAttache
attache, Status.Event event) {
@@ -1101,26 +1118,23 @@ public class AgentManagerImpl extends ManagerBase
implements AgentManager, Handl
return attache;
}
- private AgentAttache handleConnectedAgent(final Link link, final
StartupCommand[] startup, final Request request) {
- AgentAttache attache = null;
- ReadyCommand ready = null;
- try {
- final List<String> agentMSHostList = new ArrayList<>();
- String lbAlgorithm = null;
- if (startup != null && startup.length > 0) {
- final String agentMSHosts = startup[0].getMsHostList();
- if (StringUtils.isNotEmpty(agentMSHosts)) {
- String[] msHosts = agentMSHosts.split("@");
- if (msHosts.length > 1) {
- lbAlgorithm = msHosts[1];
- }
-
agentMSHostList.addAll(Arrays.asList(msHosts[0].split(",")));
+ private AgentAttache sendReadyAndGetAttache(HostVO host, ReadyCommand
ready, Link link, StartupCommand[] startup) throws ConnectionException {
+ final List<String> agentMSHostList = new ArrayList<>();
+ String lbAlgorithm = null;
+ if (startup != null && startup.length > 0) {
+ final String agentMSHosts = startup[0].getMsHostList();
+ if (StringUtils.isNotEmpty(agentMSHosts)) {
+ String[] msHosts = agentMSHosts.split("@");
+ if (msHosts.length > 1) {
+ lbAlgorithm = msHosts[1];
}
+ agentMSHostList.addAll(Arrays.asList(msHosts[0].split(",")));
}
-
- final HostVO host =
_resourceMgr.createHostVOForConnectedAgent(startup);
- if (host != null) {
- ready = new ReadyCommand(host.getDataCenterId(), host.getId(),
NumbersUtil.enableHumanReadableSizes);
+ }
+ AgentAttache attache = null;
+ GlobalLock joinLock = getHostJoinLock(host.getId());
+ if (joinLock.lock(60)) {
+ try {
if (!indirectAgentLB.compareManagementServerList(host.getId(),
host.getDataCenterId(), agentMSHostList, lbAlgorithm)) {
final List<String> newMSList =
indirectAgentLB.getManagementServerList(host.getId(), host.getDataCenterId(),
null);
@@ -1132,6 +1146,24 @@ public class AgentManagerImpl extends ManagerBase
implements AgentManager, Handl
attache = createAttacheForConnect(host, link);
attache = notifyMonitorsOfConnection(attache, startup, false);
+ } finally {
+ joinLock.unlock();
+ }
+ } else {
+ throw new ConnectionException(true, "Unable to acquire lock on
host " + host.getUuid());
+ }
+ joinLock.releaseRef();
+ return attache;
+ }
+
+ private AgentAttache handleConnectedAgent(final Link link, final
StartupCommand[] startup, final Request request) {
+ AgentAttache attache = null;
+ ReadyCommand ready = null;
+ try {
+ final HostVO host =
_resourceMgr.createHostVOForConnectedAgent(startup);
+ if (host != null) {
+ ready = new ReadyCommand(host.getDataCenterId(), host.getId(),
NumbersUtil.enableHumanReadableSizes);
+ attache = sendReadyAndGetAttache(host, ready, link, startup);
}
} catch (final Exception e) {
s_logger.debug("Failed to handle host connection: ", e);
@@ -1265,6 +1297,8 @@ public class AgentManagerImpl extends ManagerBase
implements AgentManager, Handl
connectAgent(link, cmds, request);
}
return;
+ } else if (cmd instanceof StartupCommand) {
+ connectAgent(link, cmds, request);
}
final long hostId = attache.getId();
@@ -1318,13 +1352,14 @@ public class AgentManagerImpl extends ManagerBase
implements AgentManager, Handl
handleCommands(attache, request.getSequence(), new
Command[] {cmd});
if (cmd instanceof PingCommand) {
final long cmdHostId =
((PingCommand)cmd).getHostId();
+ boolean requestStartupCommand = false;
+ final HostVO host =
_hostDao.findById(Long.valueOf(cmdHostId));
+ boolean gatewayAccessible = true;
// if the router is sending a ping, verify the
// gateway was pingable
if (cmd instanceof PingRoutingCommand) {
- final boolean gatewayAccessible =
((PingRoutingCommand)cmd).isGatewayAccessible();
- final HostVO host =
_hostDao.findById(Long.valueOf(cmdHostId));
-
+ gatewayAccessible =
((PingRoutingCommand)cmd).isGatewayAccessible();
if (host != null) {
if (!gatewayAccessible) {
// alert that host lost connection to
@@ -1342,7 +1377,10 @@ public class AgentManagerImpl extends ManagerBase
implements AgentManager, Handl
s_logger.debug("Not processing " +
PingRoutingCommand.class.getSimpleName() + " for agent id=" + cmdHostId + ";
can't find the host in the DB");
}
}
- answer = new PingAnswer((PingCommand)cmd);
+ if (host!= null && host.getStatus() != Status.Up
&& gatewayAccessible) {
+ requestStartupCommand = true;
+ }
+ answer = new PingAnswer((PingCommand)cmd,
requestStartupCommand);
} else if (cmd instanceof ReadyAnswer) {
final HostVO host =
_hostDao.findById(attache.getId());
if (host == null) {
@@ -1864,4 +1902,8 @@ public class AgentManagerImpl extends ManagerBase
implements AgentManager, Handl
sendCommandToAgents(hostsPerZone, params);
}
}
+
+ private GlobalLock getHostJoinLock(Long hostId) {
+ return GlobalLock.getInternLock(String.format("%s-%s", "Host-Join",
hostId));
+ }
}
diff --git a/test/integration/smoke/test_host_ping.py
b/test/integration/smoke/test_host_ping.py
new file mode 100644
index 00000000000..9de77f9b771
--- /dev/null
+++ b/test/integration/smoke/test_host_ping.py
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+""" Check state transition of host from Alert to Up on Ping
+"""
+
+# Import Local Modules
+from marvin.cloudstackTestCase import *
+from marvin.lib.common import *
+from marvin.lib.utils import *
+from nose.plugins.attrib import attr
+
+_multiprocess_shared_ = False
+
+
+class TestHostPing(cloudstackTestCase):
+
+ def setUp(self, handler=logging.StreamHandler()):
+ self.logger = logging.getLogger('TestHM')
+ self.stream_handler = handler
+ self.logger.setLevel(logging.DEBUG)
+ self.logger.addHandler(self.stream_handler)
+ self.apiclient = self.testClient.getApiClient()
+ self.hypervisor = self.testClient.getHypervisorInfo()
+ self.mgtSvrDetails = self.config.__dict__["mgtSvr"][0].__dict__
+ self.dbConnection = self.testClient.getDbConnection()
+ self.services = self.testClient.getParsedTestDataConfig()
+ self.zone = get_zone(self.apiclient, self.testClient.getZoneForTests())
+ self.pod = get_pod(self.apiclient, self.zone.id)
+ self.cleanup = []
+
+ def tearDown(self):
+ super(TestHostPing, self).tearDown()
+
+ def checkHostStateInCloudstack(self, state, host_id):
+ try:
+ listHost = Host.list(
+ self.apiclient,
+ type='Routing',
+ zoneid=self.zone.id,
+ podid=self.pod.id,
+ id=host_id
+ )
+ self.assertEqual(
+ isinstance(listHost, list),
+ True,
+ "Check if listHost returns a valid response"
+ )
+
+ self.assertEqual(
+ len(listHost),
+ 1,
+ "Check if listHost returns a host"
+ )
+ self.logger.debug(" Host state is %s " % listHost[0].state)
+ if listHost[0].state == state:
+ return True, 1
+ else:
+ return False, 1
+ except Exception as e:
+ self.logger.debug("Got exception %s" % e)
+ return False, 1
+
+ @attr(
+ tags=[
+ "advanced",
+ "advancedns",
+ "smoke",
+ "basic"],
+ required_hardware="true")
+ def test_01_host_ping_on_alert(self):
+ listHost = Host.list(
+ self.apiclient,
+ type='Routing',
+ zoneid=self.zone.id,
+ podid=self.pod.id,
+ )
+ for host in listHost:
+ self.logger.debug('Hypervisor = {}'.format(host.id))
+
+ hostToTest = listHost[0]
+ sql_query = "UPDATE host SET status = 'Alert' WHERE uuid = '" +
hostToTest.id + "'"
+ self.dbConnection.execute(sql_query)
+
+ hostUpInCloudstack = wait_until(30, 8,
self.checkHostStateInCloudstack, "Up", hostToTest.id)
+
+ if not (hostUpInCloudstack):
+ raise self.fail("Host is not up %s, in cloudstack so failing test
" % (hostToTest.ipaddress))
+ return