Copilot commented on code in PR #13345:
URL: https://github.com/apache/cloudstack/pull/13345#discussion_r3354649209
##########
framework/db/src/main/java/com/cloud/utils/db/GlobalLock.java:
##########
@@ -45,127 +43,248 @@
* </p>
*/
public class GlobalLock {
- protected Logger logger = LogManager.getLogger(getClass());
+ protected final static Logger logger =
LogManager.getLogger(GlobalLock.class);
private String name;
- private int lockCount = 0;
- private Thread ownerThread = null;
-
- private int referenceCount = 0;
- private long holdingStartTick = 0;
-
- private static Map<String, GlobalLock> s_lockMap = new HashMap<String,
GlobalLock>();
+ /**
+ * DB lock count.
+ * Increments on {@link GlobalLock#lock(int)} and decrements on {@link
GlobalLock#unlock()}.
+ * Upon {@link GlobalLock#unlock()}, if {@link GlobalLock#lockCount} is
less than 1, then lock removed from DB
+ */
+ private int lockCount;
+
+ /**
+ * Internal (in-memory) lock count.
+ * Increments on {@link GlobalLock#addRef()} and indirectly on {@link
GlobalLock#getInternLock(String)} and
+ * decrements on {@link GlobalLock#releaseRef()}, {@link
GlobalLock#unlock()} and on {@link GlobalLock#lock(int)}
+ * if DB lock is unsuccessful
+ */
+ private int referenceCount;
+
+ /**
+ * Thread that owns lock. If lock called from different thread, it will be
waiting for the owner to unlock it
+ * within requested timeout. If owner thread call {@link
GlobalLock#lock(int)} again, then
+ * {@link GlobalLock#lockCount} will be incremented.
+ * If {@link GlobalLock#unlock()} called by owner thread, or DB lock will
be unsuccessful, then owner thread will be
+ * nullified.
+ */
+ private Thread ownerThread;
+
+ /**
+ * Variable to hold lock duration in milliseconds. Used for information
only.
+ */
+ private long holdingStartTick;
+
+ /**
+ * Holds all created locks.
+ */
+ private static Map<String, GlobalLock> s_lockMap = new HashMap<>();
+
+ /**
+ * Create lock.
+ *
+ * @param name lock name
+ */
private GlobalLock(String name) {
this.name = name;
}
+ /**
+ * Increment reference count to lock.
+ *
+ * @return reference count
+ */
public int addRef() {
synchronized (this) {
referenceCount++;
return referenceCount;
}
}
+ /**
+ * Decrement reference count to lock.
+ *
+ * @return reference count
+ */
public int releaseRef() {
- int refCount;
-
boolean needToRemove = false;
synchronized (this) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Releasing reference for internal lock {},
reference count: {}, lock count: {}",
+ name, referenceCount, lockCount);
+ }
referenceCount--;
- refCount = referenceCount;
-
- if (referenceCount < 0)
- logger.warn("Unmatched Global lock " + name + " reference
usage detected, check your code!");
- if (referenceCount == 0)
+ if (referenceCount < 0) {
+ logger.warn("Unmatched internal lock {} reference usage
detected (reference count: {}, " +
+ "lock count: {}), check your code!", name,
referenceCount, lockCount);
+ } else if (referenceCount < 1) {
needToRemove = true;
+ }
}
- if (needToRemove)
+ if (needToRemove) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Need to release internal lock {}", name);
+ }
releaseInternLock(name);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Released reference for lock {}, reference count:
{}", name, referenceCount);
+ }
+ return referenceCount;
+ }
- return refCount;
+ public static boolean isLockAvailable(String name) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Checking lock present for {}", name);
+ }
+ boolean result = false;
+ try {
+ result = DbUtil.isFreeLock(name);
+ } finally {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Result of checking lock present for {}: {}",
name, result);
+ }
+ }
+ return result;
Review Comment:
GlobalLock.isLockAvailable() delegates to DbUtil.isFreeLock(), but the debug
messages say "lock present" which is the opposite of what IS_FREE_LOCK()
reports. This makes troubleshooting lock/availability logic confusing.
##########
engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java:
##########
@@ -1544,24 +2009,26 @@ protected void runInContext() {
}
protected void connectAgent(final Link link, final Command[] cmds, final
Request request) {
- // send startupanswer to agent in the very beginning, so agent can
move on without waiting for the answer for an undetermined time, if we put this
logic into another
- // thread pool.
- final StartupAnswer[] answers = new StartupAnswer[cmds.length];
+ // send startup answer to agent in the very beginning, so agent can
move on without waiting for the answer for an undetermined time,
+ // if we put this logic into another thread pool.
+ Map<String, String> backoffConfiguration =
ConfigKeyUtil.toMap(BackoffConfiguration.value());
+ StartupAnswer[] answers = new StartupAnswer[cmds.length];
Command cmd;
for (int i = 0; i < cmds.length; i++) {
cmd = cmds[i];
- if (cmd instanceof StartupRoutingCommand || cmd instanceof
StartupProxyCommand || cmd instanceof StartupSecondaryStorageCommand ||
- cmd instanceof StartupStorageCommand) {
- answers[i] = new StartupAnswer((StartupCommand) cmds[i], 0,
"", "", mgmtServiceConf.getPingInterval());
- break;
+ if (cmd instanceof StartupRoutingCommand || cmd instanceof
StartupProxyCommand || cmd instanceof StartupSecondaryStorageCommand
+ || cmd instanceof StartupStorageCommand) {
+ StartupAnswer answer = new StartupAnswer((StartupCommand)
cmds[i], 0, "", "", mgmtServiceConf.getPingInterval());
+ answer.setParams(backoffConfiguration);
+
answer.setAgentHostStatusCheckDelaySec(AgentHostStatusCheckDelay.value());
+ answers[i] = answer;
}
}
- Response response;
- response = new Response(request, answers[0], _nodeId, -1);
+ Response response = new Response(request, answers, _nodeId, -1);
try {
Review Comment:
connectAgent() builds a StartupAnswer[] aligned to the incoming command
indexes, but Response.getAnswer() always returns answers[0]. If the
Startup*Command is not at index 0 (or if answers[0] remains null), the agent
will not treat the response as a StartupAnswer and the connect handshake can
fail silently.
##########
engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java:
##########
@@ -454,6 +580,45 @@ private AgentControlAnswer handleControlCommand(final
AgentAttache attache, fina
return new AgentControlAnswer(cmd);
}
+ private AgentConnectStatusAnswer
handleAgentConnectStatusCommand(AgentAttache attache, AgentConnectStatusCommand
cmd) {
+ HostVO hostVo = _hostDao.findById(attache.getId());
+ return getConnectStatusAnswer(hostVo, cmd);
+ }
+
+ private AgentConnectStatusAnswer getConnectStatusAnswer(HostVO hostVo,
AgentConnectStatusCommand cmd) {
+ long hostId = hostVo.getId();
+ String hostName = hostVo.getName();
+ String lockName = getHostJoinLockName(hostId);
+ Status status = hostVo.getStatus();
+ try {
+ boolean lockAvailable = GlobalLock.isLockAvailable(lockName);
+ String details = String.format("Global lock %s is%s present for
%s", lockName, lockAvailable ? "" : " not",
+ hostName);
+ logger.debug(details);
+ return getAgentConnectStatusAnswer(cmd, lockName, hostName,
lockAvailable, status, details);
+ } catch (RuntimeException e) {
+ String msg = String.format("Failed to check global lock %s
presence for %s: %s", lockName, hostName,
+ e.getMessage());
+ logger.warn(msg, e);
+ return new AgentConnectStatusAnswer(cmd, false, msg);
+ }
Review Comment:
getConnectStatusAnswer() dereferences hostVo without a null-check
(hostVo.getId()/getName()), but hostVo is sourced from HostDao lookups that can
return null. This would turn an AgentConnectStatusCommand into an NPE and
prevent the agent from determining whether it can proceed with StartupCommand.
##########
agent/src/test/java/com/cloud/agent/HostConnectProcessTest.java:
##########
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.agent;
+
+import com.cloud.exception.CloudException;
+import com.cloud.utils.nio.Link;
+import org.apache.logging.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import javax.naming.ConfigurationException;
+
+import static org.mockito.Mockito.mock;
+
+@RunWith(MockitoJUnitRunner.class)
+public class HostConnectProcessTest {
+
+ private Agent agent;
+ private Logger logger;
+ private Link link;
+ private ServerAttache attache;
+ private HostConnectProcess hostConnectProcess;
+ private boolean connectionTransfer;
+
+ @Before
+ public void setUp() throws ConfigurationException {
+ agent = mock(Agent.class);
+ logger = mock(Logger.class);
+ link = mock(Link.class);
+ attache = mock(ServerAttache.class);
+ hostConnectProcess = new HostConnectProcess(agent);
+ ReflectionTestUtils.setField(agent, "logger", logger);
+ }
+
+ @Test
+ public void testScheduleConnectProcess() throws InterruptedException,
CloudException {
+
+ hostConnectProcess.scheduleConnectProcess(link, connectionTransfer);
+ Assert.assertTrue(hostConnectProcess.isInProgress());
+ }
+}
Review Comment:
HostConnectProcessTest schedules a ScheduledExecutorService
(NamedThreadFactory creates non-daemon threads) but never shuts it down. This
can leak threads across the test suite and cause tests to hang.
##########
agent/src/main/java/com/cloud/agent/HostConnectProcess.java:
##########
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.agent;
+
+import com.cloud.agent.api.AgentConnectStatusAnswer;
+import com.cloud.agent.api.AgentConnectStatusCommand;
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.Command;
+import com.cloud.agent.api.StartupAnswer;
+import com.cloud.agent.api.StartupCommand;
+import com.cloud.agent.properties.AgentProperties;
+import com.cloud.agent.properties.AgentPropertiesFileHandler;
+import com.cloud.agent.transport.Request;
+import com.cloud.exception.CloudException;
+import com.cloud.exception.OperationTimedoutException;
+import com.cloud.host.Status;
+import com.cloud.resource.ResourceStatusUpdater;
+import com.cloud.resource.ServerResource;
+import com.cloud.utils.concurrency.NamedThreadFactory;
+import com.cloud.utils.nio.Link;
+import org.apache.cloudstack.threadcontext.ThreadContextUtil;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.ThreadContext;
+
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+
+public class HostConnectProcess {
+ private static final Logger logger =
LogManager.getLogger(HostConnectProcess.class);
+
+ public static final int DEFAULT_ASYNC_COMMAND_TIMEOUT_SEC =
+
AgentPropertiesFileHandler.getPropertyValue(AgentProperties.ASYNC_COMMAND_TIMEOUT_SEC);
+
+ public static final int DEFAULT_ASYNC_STARTUP_COMMAND_TIMEOUT_SEC =
+
AgentPropertiesFileHandler.getPropertyValue(AgentProperties.ASYNC_STARTUP_COMMAND_TIMEOUT_SEC);
+
+ static final long HOST_STATUS_CHECK_INITIAL_DELAY_SEC = 10;
+ private long hostStatusCheckDelaySec =
AgentPropertiesFileHandler.getPropertyValue(AgentProperties.AGENT_HOST_STATUS_CHECK_DELAY_SEC);
+ private final AtomicReference<ScheduledFuture<?>> hostStatusFutureRef =
new AtomicReference<>();
+ private final Agent agent;
+ private ScheduledExecutorService hostStatusExecutor;
+
+ public HostConnectProcess(Agent agent) {
+ this.agent = agent;
+ initExecutors();
+ }
+
+ private void initExecutors() {
+ stop();
+ var threadFactory = new NamedThreadFactory("Agent-" +
HostStatusTask.class.getSimpleName());
+ hostStatusExecutor = Executors.newScheduledThreadPool(1,
threadFactory);
+ }
+
+ /**
+ * Stops the whole connect process and cancels all scheduled asynchronous
tasks.
+ * Returns {@link Boolean#TRUE} if {@link HostConnectProcess} was waiting
for {@link StartupAnswer}.
+ */
+ public boolean stop() {
+ logger.debug("Stopping connect process. The process is active: {}",
isInProgress());
+ stopHostStatusExecutor();
+ logger.debug("Stopped executor");
+ Optional<? extends ScheduledFuture<?>> hostStatusOpt =
Optional.ofNullable(hostStatusFutureRef.getAndSet(null))
+ .filter(Predicate.not(ScheduledFuture::isCancelled));
+
+ hostStatusOpt.ifPresent(future -> future.cancel(true));
+ logger.debug("Cancelled future");
+
+ return hostStatusOpt.isPresent();
+ }
+
+ private void stopHostStatusExecutor() {
+ if (hostStatusExecutor != null) {
+ hostStatusExecutor.shutdownNow();
+ hostStatusExecutor = null;
+ }
+ }
+
+ public void scheduleConnectProcess(Link link, boolean connectionTransfer) {
+ logger.debug("Scheduling connect process for {}", link);
+ initExecutors();
+
+ var task = new HostStatusTask(link, connectionTransfer, agent,
hostStatusFutureRef);
+ var future =
hostStatusExecutor.scheduleWithFixedDelay(ThreadContextUtil.wrapThreadContext(task),
+ HOST_STATUS_CHECK_INITIAL_DELAY_SEC,
+ hostStatusCheckDelaySec, TimeUnit.SECONDS);
+ hostStatusFutureRef.set(future);
+ }
+
+ /**
+ * Returns {@link Boolean#TRUE} if {@link HostStatusTask} created and
scheduled.
+ * That means there is already {@link Status#Connecting} process is
running.
+ */
+ public boolean isInProgress() {
+ return Optional.ofNullable(hostStatusFutureRef.get())
+
.filter(Predicate.not(ScheduledFuture::isCancelled)).isPresent();
+ }
+
+ public void updateHostStatusCheckDelay(int newDelaySec) {
+ logger.info("Updating host status check delay from {} to {} seconds",
hostStatusCheckDelaySec, newDelaySec);
+ this.hostStatusCheckDelaySec = newDelaySec;
+ }
+
+ /**
+ * Task wait for the Host to be available to connect to submit {@link
StartupCommand}.
+ * Checks Host status on Management Server cluster and submit {@link
StartupCommand} only if there is no lock and
+ * Host is not {@link Status#Connecting}.
+ */
+ public static class HostStatusTask implements Runnable, AsyncSend {
+ private final Set<Status> operationalStatuses =
Set.of(Status.Connecting, Status.Up, Status.Rebalancing);
+
+ private final Link _link;
+ private final boolean _forceConnect;
+ private final Agent _agent;
+ private final AtomicReference<? extends ScheduledFuture<?>> _futureRef;
+
+ public HostStatusTask(Link link, boolean forceConnect, Agent agent,
+ AtomicReference<? extends ScheduledFuture<?>>
futureRef) {
+ logger.debug("{} created", this.getClass().getSimpleName());
+ _link = link;
+ _forceConnect = forceConnect;
+ _agent = agent;
+ _futureRef = futureRef;
+ }
+
+ private void cancel() {
+ logger.debug("Cancelling future");
+ Optional.ofNullable(_futureRef.get())
+ .filter(Predicate.not(ScheduledFuture::isCancelled))
+ .ifPresent(future -> future.cancel(true));
+ logger.debug("Cancelled future");
+ }
+
+ @Override
+ public void run() {
+ try {
+ logger.debug("Running {}", getClass().getSimpleName());
+ runInternal();
+ } catch (Exception e) {
+ logger.error("Failed to run {}", getClass().getSimpleName(),
e);
+ }
+ }
+
+ private void runInternal() {
+ ServerAttache attache = (ServerAttache) _link.attachment();
+ if (attache == null || attache.getLink() == null) {
+ cancel();
+ return;
+ }
+
+ AgentConnectStatusAnswer answer;
+ try {
+ answer = getAgentConnectStatusAnswer(attache);
+ } catch (IOException e) {
+ cancel();
+ logger.error("The connection to {} interrupted, restarting the
whole process", _link, e);
+ _agent.getRequestHandler().submit(() ->
_agent.reconnect(_link, null, _forceConnect));
+ return;
+ }
+ if (answer == null) {
+ logger.warn("Received empty agent connect status answer, will
retry later");
+ return;
+ }
+ Boolean lockAvailable = answer.isLockAvailable();
+ Status status = answer.getHostStatus();
+ if (Boolean.TRUE.equals(lockAvailable)) {
+ // send startup command here
+ logger.info("There is no lock and Host status is {}", status);
+ try {
+ sendStartupCommand(_link, _forceConnect);
+ logger.debug("Sending startup command to {} finished",
_link);
+ cancel();
+ logger.debug("Unscheduled {}", getClass().getSimpleName());
+ } catch (RuntimeException e) {
+ logger.error("Failed to send startup command to {}",
_link, e);
+ } catch (IOException e) {
+ cancel();
+ logger.error("The connection to {} interrupted, restarting
the whole process", _link, e);
+ _agent.getRequestHandler().submit(() ->
_agent.reconnect(_link, null, _forceConnect));
+ }
+ } else {
+ logger.info("There is lock and Host status is {}, will retry
later", status);
+ }
+ }
+
+ private AgentConnectStatusAnswer
getAgentConnectStatusAnswer(ServerAttache attache) throws IOException {
+ AgentConnectStatusCommand command =
_agent.setupAgentConnectStatusCommand(new AgentConnectStatusCommand());
+ var commands = new Command[]{command};
+ try {
+ return send(attache, commands, AgentConnectStatusAnswer.class,
DEFAULT_ASYNC_COMMAND_TIMEOUT_SEC);
+ } catch (RuntimeException e) {
+ String commandName = commands[0].getClass().getSimpleName();
+ logger.error("Failed to retrieve {}, will retry later",
commandName, e);
+ return null;
+ }
+ }
+
+ public void sendStartupCommand(Link link, boolean connectionTransfer)
throws IOException {
+ ServerAttache attache = (ServerAttache) link.attachment();
+ if (attache == null || attache.getLink() == null) {
+ return;
+ }
+ ServerResource serverResource = _agent.getResource();
+ StartupCommand[] startup = serverResource.initialize();
+ if (ArrayUtils.isEmpty(startup)) {
Review Comment:
HostStatusTask.sendStartupCommand() ignores the connectionTransfer flag when
building StartupCommand[] (it calls serverResource.initialize() instead of
initialize(connectionTransfer)). Several ServerResource implementations
override initialize(boolean) and may rely on this flag during initialization,
not just for setting StartupCommand.connectionTransferred.
##########
agent/src/main/java/com/cloud/agent/Agent.java:
##########
@@ -623,98 +559,351 @@ public Task create(final Task.Type type, final Link
link, final byte[] data) {
return new ServerHandler(type, link, data);
}
- protected void reconnect(final Link link) {
- reconnect(link, null, false);
+ protected void closeAndTerminateLink(Link link) {
+ Optional.ofNullable(link)
+ .map(Link::attachment)
+ .filter(ServerAttache.class::isInstance)
+ .map(ServerAttache.class::cast)
+ .ifPresentOrElse(ServerAttache::disconnect, () -> {
+ if (link != null) {
+ link.close();
+ link.terminated();
+ }
+ });
}
- protected void reconnect(final Link link, String preferredMSHost, boolean
forTransfer) {
- if (!(forTransfer || reconnectAllowed)) {
- logger.debug("Reconnect requested but it is not allowed {}", () ->
getLinkLog(link));
+ protected void stopAndCleanupConnection() {
+ if (connection == null) {
return;
}
- cancelStartupTask();
- closeAndTerminateLink(link);
- closeAndTerminateLink(this.link);
- setLink(null);
- cancelTasks();
- serverResource.disconnected();
- logger.info("Lost connection to host: {}. Attempting reconnection
while we still have {} commands in progress.", shell.getConnectedHost(),
commandsInProgress.get());
- stopAndCleanupConnection(true);
- String host = preferredMSHost;
- if (org.apache.commons.lang3.StringUtils.isBlank(host)) {
- host = shell.getNextHost();
- }
- List<String> avoidMSHostList = shell.getAvoidHosts();
- do {
- if (CollectionUtils.isEmpty(avoidMSHostList) ||
!avoidMSHostList.contains(host)) {
- connection = new NioClient(getAgentName(), host,
shell.getPort(), shell.getWorkers(), shell.getSslHandshakeTimeout(), this);
- logger.info("Reconnecting to host: {}", host);
- try {
- connection.start();
- } catch (final NioConnectionException e) {
- logger.info("Attempted to re-connect to the server, but
received an unexpected exception, trying again...", e);
- stopAndCleanupConnection(false);
- }
+ NioConnection connection = this.connection;
+ connection.stop();
+ try {
+ connection.cleanUp();
+ } catch (final IOException e) {
+ logger.warn("Fail to clean up old connection", e);
+ }
+
+ try {
+ while (connection.isStartup()) {
+ logger.debug("Waiting for connection graceful stop");
+ shell.getBackoffAlgorithm().waitBeforeRetry();
+ connection.stop();
}
- shell.getBackoffAlgorithm().waitBeforeRetry();
- host = shell.getNextHost();
- } while (!connection.isStartup());
- shell.updateConnectedHost(((NioClient)connection).getHost());
- logger.info("Connected to the host: {}", shell.getConnectedHost());
+ } catch (Exception e) {
+ logger.warn("Failed to gracefully stop connection", e);
+ }
+ logger.debug("Connection stopped");
+ }
+
+ /**
+ * Select the host to reconnect to based on priority:
+ * 1. preferredHost if defined and not blank
+ * 2. Link's socket address IP if available and not null
+ * 3. shell.getNextHost() if the above two options are not met
+ *
+ * @param preferredHost the preferred host to connect to
+ * @param link the current link which may contain socket address
information
+ * @return the host to connect to
+ */
+ protected String selectReconnectionHost(String preferredHost, Link link) {
+ return Optional.ofNullable(preferredHost)
+ .filter(org.apache.commons.lang3.StringUtils::isNotBlank)
+ .orElseGet(() -> Optional.ofNullable(link)
+ .map(Link::getSocketAddress)
+ .map(InetSocketAddress::getAddress)
+ .map(InetAddress::getHostAddress)
+ .orElseGet(shell::getNextHost));
}
- protected void closeAndTerminateLink(final Link link) {
- if (link == null) {
+ /**
+ * Reconnect to Management Server.
+ *
+ * @param link - connection holder
+ * @param preferredHost - if defined, reconnect will be performed to this
Host first,
+ * otherwise will be used {@link
IAgentShell#getNextHost()}
+ * @param forceReconnect - expected to be true if called by {@link
MigrateAgentConnectionCommand},
+ * this is only "switch Management Server", it does
not perform full Host Connect process.
+ */
+ protected void reconnect(Link link, String preferredHost, boolean
forceReconnect) {
+ if (!reconnectLock.compareAndSet(false, true)) {
+ logger.warn("Reconnect is already running, exiting");
return;
}
- link.close();
- link.terminated();
+ String requestedLink =
Optional.ofNullable(link).map(Link::toString).orElse("N/A");
+ String currentLink =
Optional.ofNullable(this.link).map(Link::toString).orElse("N/A");
+ logger.info("Reconnect info: provided link: {}, agent link: {},
preferred host: {}, force" +
+ " reconnect: {}", requestedLink, currentLink, preferredHost,
forceReconnect);
+
+ try {
+ logger.debug("Obtained reconnect lock");
+ if (!(forceReconnect || reconnectAllowed)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Reconnect requested but it is not allowed
{}", link);
+ }
+ return;
+ }
+
+ if (isReconnectStormDetected(link, preferredHost, requestedLink,
currentLink)) {
+ return;
+ }
+
+ cleanupConnectionBeforeReconnect(link);
+ // start with preferred host
+ String host = selectReconnectionHost(preferredHost, link);
+
+ String hostLog = LogUtils.getHostLog(host, shell.getPort());
+ List<String> avoidMsHostList =
Optional.ofNullable(shell.getAvoidHosts()).orElseGet(List::of);
+ // pointer to the first element of "refuse loop"
+ AtomicReference<String> firstRefuseLoopHostRef = new
AtomicReference<>(null);
+ // to break deadlock where "non-avoid" MS Hosts are down and only
"avoid" are up
+ AtomicBoolean ignoreAvoidMsHostListRef = new AtomicBoolean(false);
+ do {
+ AtomicBoolean skipTimeoutRef = new AtomicBoolean(false);
+ String parentLogContextId = (String)
ThreadContext.get("logcontextid");
+ if (parentLogContextId != null) {
+ ThreadContext.put("logcontextid-parent",
parentLogContextId);
+ }
+ ThreadContext.put("logcontextid",
UuidUtils.first(UUID.randomUUID().toString()));
+ if (ignoreAvoidMsHostListRef.get() ||
!avoidMsHostList.contains(host)) {
+ connection = new NioClient(getAgentName(), host,
shell.getPort(), shell.getWorkers(),
+ shell.getSslHandshakeTimeout(), this);
+ logger.info("Reconnecting to host: {}", hostLog);
+ try {
+ connection.start();
+ // successfully connected, skip the rest
+ continue;
+ } catch (Exception e) {
+ logReconnectionFailure(e, hostLog);
+
+ try {
+ stopAndCleanupConnection();
+ } catch (Exception ex) {
+ logger.warn("Got an exception during stop and
cleanup connection", e);
+ }
+
+ updateRefuseLoopState(e, host, firstRefuseLoopHostRef,
ignoreAvoidMsHostListRef, skipTimeoutRef);
+ }
+ } else {
+ logger.debug("Next host {} is in avoid list, skipped",
hostLog);
+ if
(org.apache.commons.lang3.StringUtils.isBlank(preferredHost)) {
+ logHostLists(avoidMsHostList);
+ skipTimeoutRef.set(true);
+ }
+ }
+ if (!skipTimeoutRef.get()) {
+ shell.getBackoffAlgorithm().waitBeforeRetry();
+ }
+ host = shell.getNextHost();
+ hostLog = LogUtils.getHostLog(host, shell.getPort());
+ logger.debug("Next host to connect: {}", hostLog);
+ } while (!connection.isStartup());
+ // successfully connected
+ shell.updateConnectedHost(((NioClient) connection).getHost());
+ String msg = String.format("Connected to the host: %s (%s)",
shell.getConnectedHost(), this.link);
+ logger.info(msg);
+ } finally {
+ reconnectLock.set(false);
+ logger.debug("Removed reconnect lock");
+ }
}
- protected void stopAndCleanupConnection(boolean waitForStop) {
- if (connection == null) {
- return;
+ /**
+ * Handles "Connection refused" loop detection and determines if backoff
timeout should be skipped.
+ * Manages refuse loop state to detect when all management servers have
been tried and need
+ * to ignore avoid list to prevent deadlock.
+ *
+ * @param e the exception from connection attempt
+ * @param host the current host being attempted
+ * @param firstRefuseLoopHostRef reference to first host in refuse loop
(modified by this method)
+ * @param ignoreAvoidMsHostListRef flag to ignore avoid list (modified by
this method)
+ * @return true if timeout should be skipped (connection refused), false
otherwise
+ */
+ private void updateRefuseLoopState(Exception e, String host,
AtomicReference<String> firstRefuseLoopHostRef, AtomicBoolean
ignoreAvoidMsHostListRef, AtomicBoolean skipTimeoutRef) {
+ // we are skipping timeout for "Connection refused" to not waste time
on down MS
+ boolean skipTimeout = Optional.ofNullable(e.getCause())
+ .filter(ConnectException.class::isInstance)
+ .map(Throwable::getMessage)
+ .filter(CONNECTION_REFUSED_MSG::equalsIgnoreCase)
+ .isPresent();
+ skipTimeoutRef.set(skipTimeout);
+ String firstRefuseLoopHost = firstRefuseLoopHostRef.get();
+ // for each "Connection refused" (maybe need to have a copy of
variable with better name)
+ // start "refuse loop"
+ if (skipTimeout && firstRefuseLoopHost == null) {
+ firstRefuseLoopHostRef.set(host);
+ ignoreAvoidMsHostListRef.set(false);
+ logger.debug("Started refuse loop for host {}",
firstRefuseLoopHost);
+ // closed "refuse loop"
+ } else if (skipTimeout && firstRefuseLoopHost.equalsIgnoreCase(host)) {
+ ignoreAvoidMsHostListRef.set(true);
+ logger.debug("Closed refuse loop for host {}",
firstRefuseLoopHost);
+ // got non "refuse" related issue, break "refuse loop"
+ } else if (!skipTimeout && (firstRefuseLoopHostRef != null ||
ignoreAvoidMsHostListRef.get())) {
+ logger.debug("Broke refuse loop for host {} by {}",
firstRefuseLoopHost, host);
+ firstRefuseLoopHostRef.set(null);
+ ignoreAvoidMsHostListRef.set(false);
}
- connection.stop();
+ }
+
+ /**
+ * Logs reconnection failure with appropriate level based on rejection
reason.
+ * If connection was rejected due to max concurrent connections limit
(Broken pipe),
+ * logs as warning. Otherwise logs as info.
+ *
+ * @param e the exception that occurred during reconnection attempt
+ * @param hostLog the formatted host log string for logging
+ */
+ private void logReconnectionFailure(Exception e, String hostLog) {
+ // check if got NIO Connection exception, caused by IO Exception
"Broken pipe"
+ boolean rejectedByMs =
Optional.of(e).filter(NioConnectionException.class::isInstance)
+ .map(Exception::getCause)
+ .filter(IOException.class::isInstance)
+ .map(IOException.class::cast)
+ .map(IOException::getMessage)
+ .filter(BROKEN_PIPE_MSG::equalsIgnoreCase)
+ .isPresent();
+ if (rejectedByMs) {
+ logger.warn("Attempted to re-connect to {}, but rejected" +
+ " due to 'agent.max.concurrent.new.connections' reached
limit," +
+ " will try again", hostLog, e);
+ } else {
+ logger.info("Attempted to re-connect to {}, but got exception," +
+ " will try again", hostLog, e);
+ }
+ }
+
+ /**
+ * Logs all management server host lists for debugging reconnection logic.
+ * Outputs defined hosts, hosts to avoid, and calculated available hosts.
+ *
+ * @param avoidMsHostList list of management server hosts to avoid during
reconnection
+ */
+ private void logHostLists(List<String> avoidMsHostList) {
+ logger.debug("Preferred host is not defined");
try {
- connection.cleanUp();
- } catch (final IOException e) {
- logger.warn("Fail to clean up old connection. {}", e);
+ List<String> hostsList = Optional.ofNullable(shell.getHosts())
+ .map(Arrays::asList)
+ .orElseGet(List::of);
+
+ List<String> hostsShortList = new ArrayList<>(hostsList);
+ hostsShortList.removeAll(avoidMsHostList);
+
+ logger.info("Defined hosts: {} Avoid hosts: {} Available hosts:
{}",
+ String.join(", ", hostsList), String.join(", ",
avoidMsHostList), String.join(", ", hostsShortList));
+ } catch (Exception e) {
+ logger.warn("Failed to calculate next host logic", e);
}
- if (!waitForStop) {
- return;
+ }
+
+ /**
+ * Cleans up current connection state before attempting reconnection.
+ * Stops host connect process, terminates links, cancels scheduled tasks,
+ * notifies server resource about disconnection, and resets connection
tracking.
+ *
+ * @param link the link that triggered reconnection
+ */
+ private void cleanupConnectionBeforeReconnect(Link link) {
+ String lastConnectedHost = shell.getConnectedHost();
+ try {
+ // reset Host status track and Startup process initiating
+ logger.debug("Stopping Host Connect process");
+ hostConnectProcess.stop();
+ closeAndTerminateLink(link);
+ closeAndTerminateLink(this.link);
+ setLink(null);
+ cancelTasks();
+ serverResource.disconnected();
+ stopAndCleanupConnection();
+ shell.updateConnectedHost(null);
+ } catch (Exception ex) {
+ logger.error("Failed to cleanup previous connection", ex);
+ }
+ logger.info("Lost connection to host: {}. Attempting reconnection
while we still have" +
+ " {} commands in progress.", lastConnectedHost,
commandsInProgress.get());
+ }
+
+ /**
+ * Detects reconnection storm by checking if the reconnect request is
redundant.
+ * This prevents processing stale reconnection requests for old links when
+ * agent has already established a new connection.
+ *
+ * @param link the link requesting reconnection
+ * @param preferredHost the preferred host to reconnect to (may be null)
+ * @param requestedLink string representation of the requested link for
logging
+ * @param currentLink string representation of the current agent link for
logging
+ * @return true if reconnection storm is detected and request should be
skipped, false otherwise
+ */
+ private boolean isReconnectStormDetected(Link link, String preferredHost,
String requestedLink, String currentLink) {
+ logger.debug("Calling storm guard");
+ boolean reconnectForCurrentLink = link == this.link;
+ boolean currentLinkTerminated = this.link != null &&
this.link.isTerminated();
+ boolean reconnectForNewHost = this.hostname != null &&
this.hostname.equals(preferredHost);
+ // if none of the above is true
+ boolean stormDetected = ! (reconnectForCurrentLink ||
currentLinkTerminated || reconnectForNewHost);
Review Comment:
isReconnectStormDetected() treats a reconnect as "not a storm" when
preferredHost equals this.hostname (the agent's hostname). preferredHost is a
Management Server host, so this condition is unrelated and can incorrectly
cause legitimate reconnects to be skipped (or storms to be misdetected).
##########
utils/src/main/java/com/cloud/utils/LogUtils.java:
##########
@@ -106,4 +108,29 @@ public static String logGsonWithoutException(String
formatMessage, Object ... ob
return errorMsg;
}
}
+
+ /**
+ * Generates address entry for log in format of {@code
IP_ADDRESS/HOST_NAME:PORT}, where {@code HOST_NAME} is
+ * optional if it cannot be resolved.
+ *
+ * @param address IP address or Host name
+ * @param port port
+ */
+ public static String getHostLog(String address, Integer port) {
+ try {
+ InetAddress inetAddress = InetAddress.getByName(address);
+ String hostName = inetAddress.getHostName();
+ String ipAddress = inetAddress.getHostAddress();
+ if (port == null) {
+ return String.format("%s/%s", ipAddress, hostName);
+ }
+ return String.format("%s/%s:%s", ipAddress, hostName, port);
+ } catch (UnknownHostException e) {
+ LOGGER.warn("Failed to resolve name for address {}", address, e);
+ }
+ if (port == null) {
+ return address;
+ }
+ return String.format("%s:%s", address, port);
+ }
Review Comment:
LogUtils.getHostLog() performs DNS lookups (InetAddress.getByName +
getHostName). This is called during connection/reconnection logging (e.g.,
NioClient.init and Agent.reconnect loop), so slow/blocked DNS can materially
delay reconnect attempts. Consider avoiding reverse lookups and just log the
resolved IP plus the original host string.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]