This is an automated email from the ASF dual-hosted git repository.

smolnar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new af3f9ff  AMBARI-24714. Avoid multi-threading and caching issues when 
aborting requests and processing agent reports at the same time (#2411)
af3f9ff is described below

commit af3f9ff0b367340c432431384d5d805a72f8495a
Author: Sandor Molnar <[email protected]>
AuthorDate: Thu Oct 4 07:07:46 2018 +0200

    AMBARI-24714. Avoid multi-threading and caching issues when aborting 
requests and processing agent reports at the same time (#2411)
---
 .../server/actionmanager/ActionDBAccessorImpl.java | 171 +++++++++++++--------
 1 file changed, 103 insertions(+), 68 deletions(-)

diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 94aee41..5c1fa66 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -28,6 +28,8 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.ambari.annotations.TransactionalLock;
 import org.apache.ambari.annotations.TransactionalLock.LockArea;
@@ -153,6 +155,10 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
   private Cache<Long, HostRoleCommand> hostRoleCommandCache;
   private long cacheLimit; //may be exceeded to store tasks from one request
 
+  //We do lock for writing/reading when HRCs are manipulated/read by different 
threads
+  //For instance we do lock for writing when aborting all HRCs of a request to 
avoid reading the same HRCs by agent report processor (so that we lock there 
for reading too)
+  private final ReadWriteLock hrcOperationsLock = new ReentrantReadWriteLock();
+
   @Inject
   public ActionDBAccessorImpl(@Named("executionCommandCacheSize") long 
cacheLimit,
                               AmbariEventPublisher eventPublisher) {
@@ -213,32 +219,35 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
    */
   @Override
   public Collection<HostRoleCommandEntity> abortOperation(long requestId) {
-    Collection<HostRoleCommandEntity> abortedHostRoleCommands = 
Collections.emptyList();
-    long now = System.currentTimeMillis();
-
-    // only request commands which actually need to be aborted; requesting all
-    // commands here can cause OOM problems during large requests like upgrades
-    List<HostRoleCommandEntity> commands = 
hostRoleCommandDAO.findByRequestIdAndStatuses(requestId,
-        HostRoleStatus.SCHEDULED_STATES);
-
-    for (HostRoleCommandEntity command : commands) {
-      command.setStatus(HostRoleStatus.ABORTED);
-      command.setEndTime(now);
-      LOG.info("Aborting command. Hostname " + command.getHostName()
-          + " role " + command.getRole()
-          + " requestId " + command.getRequestId()
-          + " taskId " + command.getTaskId()
-          + " stageId " + command.getStageId());
-
-      auditLog(command, requestId);
-    }
+    try {
+      hrcOperationsLock.writeLock().lock();
+      Collection<HostRoleCommandEntity> abortedHostRoleCommands = new 
ArrayList<>();
+      long now = System.currentTimeMillis();
+
+      // only request commands which actually need to be aborted; requesting 
all
+      // commands here can cause OOM problems during large requests like 
upgrades
+      List<HostRoleCommandEntity> commands = 
hostRoleCommandDAO.findByRequestIdAndStatuses(requestId,
+          HostRoleStatus.SCHEDULED_STATES);
+
+      for (HostRoleCommandEntity command : commands) {
+        command.setStatus(HostRoleStatus.ABORTED);
+        command.setEndTime(now);
+        abortedHostRoleCommands.add(hostRoleCommandDAO.merge(command));
+        LOG.info("Aborted command. Hostname " + command.getHostName()
+        + " role " + command.getRole()
+        + " requestId " + command.getRequestId()
+        + " taskId " + command.getTaskId()
+        + " stageId " + command.getStageId());
+
+        auditLog(command, requestId);
+        cacheHostRoleCommand(hostRoleCommandFactory.createExisting(command));
+      }
 
-    // no need to merge if there's nothing to merge
-    if (!commands.isEmpty()) {
-      abortedHostRoleCommands = hostRoleCommandDAO.mergeAll(commands);
+      endRequest(requestId);
+      return abortedHostRoleCommands;
+    } finally {
+      hrcOperationsLock.writeLock().unlock();
     }
-    endRequest(requestId);
-    return abortedHostRoleCommands;
   }
 
   /* (non-Javadoc)
@@ -514,12 +523,21 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
 
     List<Long> requestsToCheck = new ArrayList<>();
 
-    List<HostRoleCommandEntity> commandEntities = 
hostRoleCommandDAO.findByPKs(taskReports.keySet());
-    List<HostRoleCommandEntity> commandEntitiesToMerge = new ArrayList<>();
+    List<HostRoleCommandEntity> commandEntities;
+    try {
+      hrcOperationsLock.readLock().lock();
+      commandEntities = hostRoleCommandDAO.findByPKs(taskReports.keySet());
+    } finally {
+      hrcOperationsLock.readLock().unlock();
+    }
     for (HostRoleCommandEntity commandEntity : commandEntities) {
       CommandReport report = taskReports.get(commandEntity.getTaskId());
       HostRoleStatus existingTaskStatus = commandEntity.getStatus();
       HostRoleStatus reportedTaskStatus = 
HostRoleStatus.valueOf(report.getStatus());
+      if (!existingTaskStatus.isCompletedState()) {
+        //sometimes JPA cache returns a task with incorrect state (i.e. it was 
aborted just before we queried above); reading it from the DB for the sake of 
integrity
+        //existingTaskStatus = 
hostRoleCommandDAO.refreshHostRoleCommand(commandEntity).getStatus();
+      }
       if (!existingTaskStatus.isCompletedState() || existingTaskStatus == 
HostRoleStatus.ABORTED) {
         // if FAILED and marked for holding then set reportedTaskStatus = 
HOLDING_FAILED
         if (reportedTaskStatus == HostRoleStatus.FAILED && 
commandEntity.isRetryAllowed()) {
@@ -537,6 +555,7 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
         }
 
         if (!existingTaskStatus.isCompletedState()) {
+          LOG.debug("Setting status from {} to {} for {}", existingTaskStatus, 
reportedTaskStatus, commandEntity.getTaskId());
           commandEntity.setStatus(reportedTaskStatus);
         }
         commandEntity.setStdOut(report.getStdOut() == null ? null : 
report.getStdOut().getBytes());
@@ -544,9 +563,18 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
         commandEntity.setStructuredOut(report.getStructuredOut() == null ? 
null :
             report.getStructuredOut().getBytes());
         commandEntity.setExitcode(report.getExitCode());
-        if 
(HostRoleStatus.getCompletedStates().contains(commandEntity.getStatus())) {
+        if (commandEntity.getStatus().isCompletedState()) {
           commandEntity.setEndTime(now);
+        }
+
+        try {
+          hrcOperationsLock.writeLock().lock();
+          hostRoleCommandDAO.merge(commandEntity);
+        } finally {
+          hrcOperationsLock.writeLock().unlock();
+        }
 
+        if (commandEntity.getStatus().isCompletedState()) {
           String actionId = report.getActionId();
           long[] requestStageIds = StageUtils.getRequestStage(actionId);
           long requestId = requestStageIds[0];
@@ -556,19 +584,12 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
             requestsToCheck.add(requestId);
           }
         }
-        commandEntitiesToMerge.add(commandEntity);
       } else {
        LOG.warn(String.format("Request for invalid transition of host role 
command status received for task id %d from " +
-           "agent: %s -> 
%s",commandEntity.getTaskId(),existingTaskStatus,reportedTaskStatus));
+           "agent: %s -> %s",commandEntity.getTaskId(), existingTaskStatus, 
reportedTaskStatus));
       }
     }
 
-    // no need to merge if there's nothing to merge
-    if (!commandEntitiesToMerge.isEmpty()) {
-      hostRoleCommandDAO.mergeAll(commandEntitiesToMerge);
-    }
-
-
     for (Long requestId : requestsToCheck) {
       endRequestIfCompleted(requestId);
     }
@@ -716,41 +737,50 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
     }
 
     List<HostRoleCommand> commands = new ArrayList<>();
-
-    Map<Long, HostRoleCommand> cached = 
hostRoleCommandCache.getAllPresent(taskIds);
-    commands.addAll(cached.values());
-
-    List<Long> absent = new ArrayList<>();
-    absent.addAll(taskIds);
-    absent.removeAll(cached.keySet());
-
-    if (!absent.isEmpty()) {
-      boolean allowStore = hostRoleCommandCache.size() <= cacheLimit;
-
-      for (HostRoleCommandEntity commandEntity : 
hostRoleCommandDAO.findByPKs(absent)) {
-        HostRoleCommand hostRoleCommand = 
hostRoleCommandFactory.createExisting(commandEntity);
-        commands.add(hostRoleCommand);
-        if (allowStore) {
-          switch (hostRoleCommand.getStatus()) {
-            case ABORTED:
-            case COMPLETED:
-            case TIMEDOUT:
-            case FAILED:
-              hostRoleCommandCache.put(hostRoleCommand.getTaskId(), 
hostRoleCommand);
-              break;
-          }
+    try {
+      hrcOperationsLock.readLock().lock();
+      Map<Long, HostRoleCommand> cached = 
hostRoleCommandCache.getAllPresent(taskIds);
+      commands.addAll(cached.values());
+
+      List<Long> absent = new ArrayList<>();
+      absent.addAll(taskIds);
+      absent.removeAll(cached.keySet());
+
+      if (!absent.isEmpty()) {
+        for (HostRoleCommandEntity commandEntity : 
hostRoleCommandDAO.findByPKs(absent)) {
+          HostRoleCommand hostRoleCommand = 
hostRoleCommandFactory.createExisting(commandEntity);
+          commands.add(hostRoleCommand);
+          cacheHostRoleCommand(hostRoleCommand);
         }
       }
+      Collections.sort(commands, new Comparator<HostRoleCommand>() {
+        @Override
+        public int compare(HostRoleCommand o1, HostRoleCommand o2) {
+          return (int) (o1.getTaskId()-o2.getTaskId());
+        }
+      });
+    } finally {
+      hrcOperationsLock.readLock().unlock();
     }
-    Collections.sort(commands, new Comparator<HostRoleCommand>() {
-      @Override
-      public int compare(HostRoleCommand o1, HostRoleCommand o2) {
-        return (int) (o1.getTaskId()-o2.getTaskId());
-      }
-    });
     return commands;
   }
 
+  private void cacheHostRoleCommand(HostRoleCommand hostRoleCommand) {
+    if (hostRoleCommandCache.size() <= cacheLimit) {
+      switch (hostRoleCommand.getStatus()) {
+      case ABORTED:
+      case COMPLETED:
+      case TIMEDOUT:
+      case FAILED:
+        hostRoleCommandCache.put(hostRoleCommand.getTaskId(), hostRoleCommand);
+        break;
+      default:
+        // NOP
+        break;
+      }
+    }
+  }
+
   @Override
   public List<HostRoleCommand> getTasksByHostRoleAndStatus(String hostname, 
String role, HostRoleStatus status) {
     return 
getTasks(hostRoleCommandDAO.findTaskIdsByHostRoleAndStatus(hostname, role, 
status));
@@ -763,11 +793,16 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
 
   @Override
   public HostRoleCommand getTask(long taskId) {
-    HostRoleCommandEntity commandEntity = hostRoleCommandDAO.findByPK((int) 
taskId);
-    if (commandEntity == null) {
-      return null;
+    try {
+      hrcOperationsLock.readLock().lock();
+      HostRoleCommandEntity commandEntity = 
hostRoleCommandDAO.findByPK(taskId);
+      if (commandEntity == null) {
+        return null;
+      }
+      return hostRoleCommandFactory.createExisting(commandEntity);
+    } finally {
+      hrcOperationsLock.readLock().unlock();
     }
-    return hostRoleCommandFactory.createExisting(commandEntity);
   }
 
   @Override

Reply via email to