This is an automated email from the ASF dual-hosted git repository.
smolnar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new af3f9ff AMBARI-24714. Avoid multi-threading and caching issues when
aborting requests and processing agent reports at the same time (#2411)
af3f9ff is described below
commit af3f9ff0b367340c432431384d5d805a72f8495a
Author: Sandor Molnar <[email protected]>
AuthorDate: Thu Oct 4 07:07:46 2018 +0200
AMBARI-24714. Avoid multi-threading and caching issues when aborting
requests and processing agent reports at the same time (#2411)
---
.../server/actionmanager/ActionDBAccessorImpl.java | 171 +++++++++++++--------
1 file changed, 103 insertions(+), 68 deletions(-)
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 94aee41..5c1fa66 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -28,6 +28,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ambari.annotations.TransactionalLock;
import org.apache.ambari.annotations.TransactionalLock.LockArea;
@@ -153,6 +155,10 @@ public class ActionDBAccessorImpl implements
ActionDBAccessor {
private Cache<Long, HostRoleCommand> hostRoleCommandCache;
private long cacheLimit; //may be exceeded to store tasks from one request
+ //We do lock for writing/reading when HRCs are manipulated/read by different
threads
+ //For instance we do lock for writing when aborting all HRCs of a request to
avoid reading the same HRCs by agent report processor (so that we lock there
for reading too)
+ private final ReadWriteLock hrcOperationsLock = new ReentrantReadWriteLock();
+
@Inject
public ActionDBAccessorImpl(@Named("executionCommandCacheSize") long
cacheLimit,
AmbariEventPublisher eventPublisher) {
@@ -213,32 +219,35 @@ public class ActionDBAccessorImpl implements
ActionDBAccessor {
*/
@Override
public Collection<HostRoleCommandEntity> abortOperation(long requestId) {
- Collection<HostRoleCommandEntity> abortedHostRoleCommands =
Collections.emptyList();
- long now = System.currentTimeMillis();
-
- // only request commands which actually need to be aborted; requesting all
- // commands here can cause OOM problems during large requests like upgrades
- List<HostRoleCommandEntity> commands =
hostRoleCommandDAO.findByRequestIdAndStatuses(requestId,
- HostRoleStatus.SCHEDULED_STATES);
-
- for (HostRoleCommandEntity command : commands) {
- command.setStatus(HostRoleStatus.ABORTED);
- command.setEndTime(now);
- LOG.info("Aborting command. Hostname " + command.getHostName()
- + " role " + command.getRole()
- + " requestId " + command.getRequestId()
- + " taskId " + command.getTaskId()
- + " stageId " + command.getStageId());
-
- auditLog(command, requestId);
- }
+ try {
+ hrcOperationsLock.writeLock().lock();
+ Collection<HostRoleCommandEntity> abortedHostRoleCommands = new
ArrayList<>();
+ long now = System.currentTimeMillis();
+
+ // only request commands which actually need to be aborted; requesting
all
+ // commands here can cause OOM problems during large requests like
upgrades
+ List<HostRoleCommandEntity> commands =
hostRoleCommandDAO.findByRequestIdAndStatuses(requestId,
+ HostRoleStatus.SCHEDULED_STATES);
+
+ for (HostRoleCommandEntity command : commands) {
+ command.setStatus(HostRoleStatus.ABORTED);
+ command.setEndTime(now);
+ abortedHostRoleCommands.add(hostRoleCommandDAO.merge(command));
+ LOG.info("Aborted command. Hostname " + command.getHostName()
+ + " role " + command.getRole()
+ + " requestId " + command.getRequestId()
+ + " taskId " + command.getTaskId()
+ + " stageId " + command.getStageId());
+
+ auditLog(command, requestId);
+ cacheHostRoleCommand(hostRoleCommandFactory.createExisting(command));
+ }
- // no need to merge if there's nothing to merge
- if (!commands.isEmpty()) {
- abortedHostRoleCommands = hostRoleCommandDAO.mergeAll(commands);
+ endRequest(requestId);
+ return abortedHostRoleCommands;
+ } finally {
+ hrcOperationsLock.writeLock().unlock();
}
- endRequest(requestId);
- return abortedHostRoleCommands;
}
/* (non-Javadoc)
@@ -514,12 +523,21 @@ public class ActionDBAccessorImpl implements
ActionDBAccessor {
List<Long> requestsToCheck = new ArrayList<>();
- List<HostRoleCommandEntity> commandEntities =
hostRoleCommandDAO.findByPKs(taskReports.keySet());
- List<HostRoleCommandEntity> commandEntitiesToMerge = new ArrayList<>();
+ List<HostRoleCommandEntity> commandEntities;
+ try {
+ hrcOperationsLock.readLock().lock();
+ commandEntities = hostRoleCommandDAO.findByPKs(taskReports.keySet());
+ } finally {
+ hrcOperationsLock.readLock().unlock();
+ }
for (HostRoleCommandEntity commandEntity : commandEntities) {
CommandReport report = taskReports.get(commandEntity.getTaskId());
HostRoleStatus existingTaskStatus = commandEntity.getStatus();
HostRoleStatus reportedTaskStatus =
HostRoleStatus.valueOf(report.getStatus());
+ if (!existingTaskStatus.isCompletedState()) {
+ //sometimes JPA cache returns a task with incorrect state (i.e. it was
aborted just before we queried above); reading it from the DB for the sake of
integrity
+ //existingTaskStatus =
hostRoleCommandDAO.refreshHostRoleCommand(commandEntity).getStatus();
+ }
if (!existingTaskStatus.isCompletedState() || existingTaskStatus ==
HostRoleStatus.ABORTED) {
// if FAILED and marked for holding then set reportedTaskStatus =
HOLDING_FAILED
if (reportedTaskStatus == HostRoleStatus.FAILED &&
commandEntity.isRetryAllowed()) {
@@ -537,6 +555,7 @@ public class ActionDBAccessorImpl implements
ActionDBAccessor {
}
if (!existingTaskStatus.isCompletedState()) {
+ LOG.debug("Setting status from {} to {} for {}", existingTaskStatus,
reportedTaskStatus, commandEntity.getTaskId());
commandEntity.setStatus(reportedTaskStatus);
}
commandEntity.setStdOut(report.getStdOut() == null ? null :
report.getStdOut().getBytes());
@@ -544,9 +563,18 @@ public class ActionDBAccessorImpl implements
ActionDBAccessor {
commandEntity.setStructuredOut(report.getStructuredOut() == null ?
null :
report.getStructuredOut().getBytes());
commandEntity.setExitcode(report.getExitCode());
- if
(HostRoleStatus.getCompletedStates().contains(commandEntity.getStatus())) {
+ if (commandEntity.getStatus().isCompletedState()) {
commandEntity.setEndTime(now);
+ }
+
+ try {
+ hrcOperationsLock.writeLock().lock();
+ hostRoleCommandDAO.merge(commandEntity);
+ } finally {
+ hrcOperationsLock.writeLock().unlock();
+ }
+ if (commandEntity.getStatus().isCompletedState()) {
String actionId = report.getActionId();
long[] requestStageIds = StageUtils.getRequestStage(actionId);
long requestId = requestStageIds[0];
@@ -556,19 +584,12 @@ public class ActionDBAccessorImpl implements
ActionDBAccessor {
requestsToCheck.add(requestId);
}
}
- commandEntitiesToMerge.add(commandEntity);
} else {
LOG.warn(String.format("Request for invalid transition of host role
command status received for task id %d from " +
- "agent: %s ->
%s",commandEntity.getTaskId(),existingTaskStatus,reportedTaskStatus));
+ "agent: %s -> %s",commandEntity.getTaskId(), existingTaskStatus,
reportedTaskStatus));
}
}
- // no need to merge if there's nothing to merge
- if (!commandEntitiesToMerge.isEmpty()) {
- hostRoleCommandDAO.mergeAll(commandEntitiesToMerge);
- }
-
-
for (Long requestId : requestsToCheck) {
endRequestIfCompleted(requestId);
}
@@ -716,41 +737,50 @@ public class ActionDBAccessorImpl implements
ActionDBAccessor {
}
List<HostRoleCommand> commands = new ArrayList<>();
-
- Map<Long, HostRoleCommand> cached =
hostRoleCommandCache.getAllPresent(taskIds);
- commands.addAll(cached.values());
-
- List<Long> absent = new ArrayList<>();
- absent.addAll(taskIds);
- absent.removeAll(cached.keySet());
-
- if (!absent.isEmpty()) {
- boolean allowStore = hostRoleCommandCache.size() <= cacheLimit;
-
- for (HostRoleCommandEntity commandEntity :
hostRoleCommandDAO.findByPKs(absent)) {
- HostRoleCommand hostRoleCommand =
hostRoleCommandFactory.createExisting(commandEntity);
- commands.add(hostRoleCommand);
- if (allowStore) {
- switch (hostRoleCommand.getStatus()) {
- case ABORTED:
- case COMPLETED:
- case TIMEDOUT:
- case FAILED:
- hostRoleCommandCache.put(hostRoleCommand.getTaskId(),
hostRoleCommand);
- break;
- }
+ try {
+ hrcOperationsLock.readLock().lock();
+ Map<Long, HostRoleCommand> cached =
hostRoleCommandCache.getAllPresent(taskIds);
+ commands.addAll(cached.values());
+
+ List<Long> absent = new ArrayList<>();
+ absent.addAll(taskIds);
+ absent.removeAll(cached.keySet());
+
+ if (!absent.isEmpty()) {
+ for (HostRoleCommandEntity commandEntity :
hostRoleCommandDAO.findByPKs(absent)) {
+ HostRoleCommand hostRoleCommand =
hostRoleCommandFactory.createExisting(commandEntity);
+ commands.add(hostRoleCommand);
+ cacheHostRoleCommand(hostRoleCommand);
}
}
+ Collections.sort(commands, new Comparator<HostRoleCommand>() {
+ @Override
+ public int compare(HostRoleCommand o1, HostRoleCommand o2) {
+ return (int) (o1.getTaskId()-o2.getTaskId());
+ }
+ });
+ } finally {
+ hrcOperationsLock.readLock().unlock();
}
- Collections.sort(commands, new Comparator<HostRoleCommand>() {
- @Override
- public int compare(HostRoleCommand o1, HostRoleCommand o2) {
- return (int) (o1.getTaskId()-o2.getTaskId());
- }
- });
return commands;
}
+ private void cacheHostRoleCommand(HostRoleCommand hostRoleCommand) {
+ if (hostRoleCommandCache.size() <= cacheLimit) {
+ switch (hostRoleCommand.getStatus()) {
+ case ABORTED:
+ case COMPLETED:
+ case TIMEDOUT:
+ case FAILED:
+ hostRoleCommandCache.put(hostRoleCommand.getTaskId(), hostRoleCommand);
+ break;
+ default:
+ // NOP
+ break;
+ }
+ }
+ }
+
@Override
public List<HostRoleCommand> getTasksByHostRoleAndStatus(String hostname,
String role, HostRoleStatus status) {
return
getTasks(hostRoleCommandDAO.findTaskIdsByHostRoleAndStatus(hostname, role,
status));
@@ -763,11 +793,16 @@ public class ActionDBAccessorImpl implements
ActionDBAccessor {
@Override
public HostRoleCommand getTask(long taskId) {
- HostRoleCommandEntity commandEntity = hostRoleCommandDAO.findByPK((int)
taskId);
- if (commandEntity == null) {
- return null;
+ try {
+ hrcOperationsLock.readLock().lock();
+ HostRoleCommandEntity commandEntity =
hostRoleCommandDAO.findByPK(taskId);
+ if (commandEntity == null) {
+ return null;
+ }
+ return hostRoleCommandFactory.createExisting(commandEntity);
+ } finally {
+ hrcOperationsLock.readLock().unlock();
}
- return hostRoleCommandFactory.createExisting(commandEntity);
}
@Override