Repository: ambari
Updated Branches:
  refs/heads/trunk a384a0e1c -> 949ecd21b


AMBARI-5761. 2000-node cluster testing: during install phase of cluster 
deployment, install tasks were stuck in PENDING state. (mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/949ecd21
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/949ecd21
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/949ecd21

Branch: refs/heads/trunk
Commit: 949ecd21b4001ccd763ad82985ec6efd96eef6ff
Parents: a384a0e
Author: Myroslav Papirkovskyy <[email protected]>
Authored: Thu May 29 18:13:52 2014 +0300
Committer: Myroslav Papirkovskyy <[email protected]>
Committed: Thu May 29 18:15:40 2014 +0300

----------------------------------------------------------------------
 .../server/actionmanager/ActionDBAccessor.java  |  11 ++
 .../actionmanager/ActionDBAccessorImpl.java     |  18 +++
 .../server/actionmanager/ActionScheduler.java   | 131 +++++++++++++++++--
 .../AmbariManagementControllerImpl.java         |   1 +
 .../internal/RequestResourceProvider.java       |   7 +
 .../org/apache/ambari/server/state/Cluster.java |  11 ++
 .../server/state/cluster/ClusterImpl.java       |  33 +++++
 7 files changed, 200 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/949ecd21/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
index 86ebecf..fd3e039 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
@@ -19,6 +19,7 @@ package org.apache.ambari.server.actionmanager;
 
 import com.google.inject.persist.Transactional;
 import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.agent.ExecutionCommand;
 
 import java.util.Collection;
 import java.util.List;
@@ -111,6 +112,16 @@ public interface ActionDBAccessor {
   public long getLastPersistedRequestIdWhenInitialized();
 
   /**
+   * Bulk update scheduled commands
+   */
+  void bulkHostRoleScheduled(Stage s, List<ExecutionCommand> commands);
+
+  /**
+   * Bulk abort commands
+   */
+  void bulkAbortHostRole(Stage s, List<ExecutionCommand> commands);
+
+  /**
    * Updates scheduled stage.
    */
   public void hostRoleScheduled(Stage s, String hostname, String roleStr);

http://git-wip-us.apache.org/repos/asf/ambari/blob/949ecd21/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 7089276..375794d 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -24,6 +24,7 @@ import com.google.inject.Singleton;
 import com.google.inject.name.Named;
 import com.google.inject.persist.Transactional;
 import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.agent.ExecutionCommand;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
 import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
 import org.apache.ambari.server.orm.dao.HostDAO;
@@ -411,6 +412,23 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
     return requestId;
   }
 
+
+  @Override
+  @Transactional
+  public void bulkHostRoleScheduled(Stage s, List<ExecutionCommand> commands) {
+    for (ExecutionCommand command : commands) {
+      hostRoleScheduled(s, command.getHostname(), command.getRole());
+    }
+  }
+
+  @Override
+  @Transactional
+  public void bulkAbortHostRole(Stage s, List<ExecutionCommand> commands) {
+    for (ExecutionCommand command : commands) {
+      abortHostRole(command.getHostname(), s.getRequestId(), s.getStageId(), 
command.getRole());
+    }
+  }
+
   @Override
   @Transactional
   public void hostRoleScheduled(Stage s, String hostname, String roleStr) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/949ecd21/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index bf9ec79..2030163 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -21,12 +21,20 @@ import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.reflect.TypeToken;
+import com.google.inject.persist.UnitOfWork;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
@@ -46,16 +54,16 @@ import org.apache.ambari.server.state.HostState;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceComponentHostEvent;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import 
org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
 import org.apache.ambari.server.utils.StageUtils;
+import org.apache.commons.collections.MultiMap;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.reflect.TypeToken;
-import com.google.inject.persist.UnitOfWork;
+
 
 /**
  * This class encapsulates the action scheduler thread.
@@ -164,6 +172,7 @@ class ActionScheduler implements Runnable {
       List<Stage> stages = db.getStagesInProgress();
       if (LOG.isDebugEnabled()) {
         LOG.debug("Scheduler wakes up");
+        LOG.debug("Processing {} in progress stages ", stages.size());
       }
       if (stages == null || stages.isEmpty()) {
         //Nothing to do
@@ -172,15 +181,18 @@ class ActionScheduler implements Runnable {
         }
         return;
       }
-
+      int i_stage = 0;
       for (Stage s : stages) {
         // Check if we can process this stage in parallel with another stages
+        i_stage ++;
 
         long requestId = s.getRequestId();
         // Convert to string to avoid glitches with boxing/unboxing
         String requestIdStr = String.valueOf(requestId);
+        LOG.debug("==> STAGE_i = " + i_stage + "(requestId=" + requestIdStr + 
",StageId=" + s.getStageId() + ")");
         if (runningRequestIds.contains(requestIdStr)) {
           // We don't want to process different stages from the same request 
in parallel
+          LOG.info("==> We don't want to process different stages from the 
same request in parallel" );
           continue;
         } else {
           runningRequestIds.add(requestIdStr);
@@ -233,7 +245,11 @@ class ActionScheduler implements Runnable {
           return;
         }
 
+        List<ExecutionCommand> commandsToStart = new 
ArrayList<ExecutionCommand>();
+        List<ExecutionCommand> commandsToUpdate = new 
ArrayList<ExecutionCommand>();
+
         //Schedule what we have so far
+
         for (ExecutionCommand cmd : commandsToSchedule) {
           if (Role.valueOf(cmd.getRole()).equals(Role.AMBARI_SERVER_ACTION)) {
             /**
@@ -245,15 +261,46 @@ class ActionScheduler implements Runnable {
              */
             executeServerAction(s, cmd);
           } else {
-            try {
-              scheduleHostRole(s, cmd);
-            } catch (InvalidStateTransitionException e) {
-              LOG.warn("Could not schedule host role " + cmd.toString(), e);
-              db.abortHostRole(cmd.getHostname(), s.getRequestId(), 
s.getStageId(), cmd.getRole());
+            processHostRole(s, cmd, commandsToStart, commandsToUpdate);
+          }
+        }
+
+        LOG.debug("==> Commands to start: {}", commandsToStart.size());
+        LOG.debug("==> Commands to update: {}", commandsToUpdate.size());
+
+        //Multimap is analog of Map<Object, List<Object>> but allows to avoid 
nested loop
+        ListMultimap<String, ServiceComponentHostEvent> eventMap = 
formEventMap(s, commandsToStart);
+        LOG.debug("==> processing {} serviceComponentHostEvents...", 
eventMap.size());
+        List<ServiceComponentHostEvent> failedEvents =
+            
fsmObject.getCluster(s.getClusterName()).processServiceComponentHostEvents(eventMap);
+        LOG.debug("==> {} events failed.", failedEvents.size());
+
+        List<ExecutionCommand> commandsToAbort = new 
ArrayList<ExecutionCommand>();
+
+        for (Iterator<ExecutionCommand> iterator = 
commandsToUpdate.iterator(); iterator.hasNext(); ) {
+          ExecutionCommand cmd = iterator.next();
+          for (ServiceComponentHostEvent event : failedEvents) {
+            if (StringUtils.equals(event.getHostName(), cmd.getHostname()) &&
+                StringUtils.equals(event.getServiceComponentName(), 
cmd.getRole())) {
+              iterator.remove();
+              commandsToAbort.add(cmd);
+              break;
             }
           }
         }
 
+        LOG.debug("==> Scheduling {} tasks...", commandsToUpdate.size());
+        db.bulkHostRoleScheduled(s, commandsToUpdate);
+
+        LOG.debug("==> Aborting {} tasks...", commandsToAbort.size());
+        db.bulkAbortHostRole(s, commandsToAbort);
+
+        LOG.debug("==> Adding {} tasks to queue...", commandsToUpdate.size());
+        for (ExecutionCommand cmd : commandsToUpdate) {
+          actionQueue.enqueue(cmd.getHostname(), cmd);
+        }
+        LOG.debug("==> Finished.");
+
         if (! configuration.getParallelStageExecution()) { // If disabled
           return;
         }
@@ -262,6 +309,7 @@ class ActionScheduler implements Runnable {
       requestsInProgress.retainAll(runningRequestIds);
 
     } finally {
+      LOG.debug("Scheduler finished work.");
       unitOfWork.end();
     }
   }
@@ -272,6 +320,8 @@ class ActionScheduler implements Runnable {
    */
   private void executeServerAction(Stage s, ExecutionCommand cmd) {
     try {
+      LOG.trace("Executing server action: request_id={}, stage_id={}, 
task_id={}",
+        s.getRequestId(), s.getStageId(), cmd.getTaskId());
       long now = System.currentTimeMillis();
       String hostName = cmd.getHostname();
       String roleName = cmd.getRole();
@@ -368,6 +418,7 @@ class ActionScheduler implements Runnable {
    */
   private Map<String, RoleStats> processInProgressStage(Stage s,
       List<ExecutionCommand> commandsToSchedule) throws AmbariException {
+    LOG.debug("==> Collecting commands to schedule...");
     // Map to track role status
     Map<String, RoleStats> roleStats = initRoleStats(s);
     long now = System.currentTimeMillis();
@@ -384,12 +435,17 @@ class ActionScheduler implements Runnable {
     for (String host : s.getHosts()) {
       List<ExecutionCommandWrapper> commandWrappers = 
s.getExecutionCommands(host);
       Host hostObj = fsmObject.getHost(host);
-
+      int i_my = 0;
+      LOG.trace("===>host=" + host);
       for(ExecutionCommandWrapper wrapper : commandWrappers) {
         ExecutionCommand c = wrapper.getExecutionCommand();
         String roleStr = c.getRole();
         HostRoleStatus status = s.getHostRoleStatus(host, roleStr);
-
+        i_my ++;
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Host task " + i_my + ") id = " + c.getTaskId() + " status 
= " + status.toString() +
+            " (role=" + roleStr + "), roleCommand = "+ c.getRoleCommand());
+        }
         boolean hostDeleted = false;
         if (null != cluster) {
           Service svc = null;
@@ -451,19 +507,23 @@ class ActionScheduler implements Runnable {
             }
 
             // Dequeue command
+            LOG.info("Removing command from queue, host={}, commandId={} ", 
host, c.getCommandId());
             actionQueue.dequeue(host, c.getCommandId());
           } else {
             // reschedule command
             commandsToSchedule.add(c);
+            LOG.trace("===> commandsToSchedule(reschedule)=" + 
commandsToSchedule.size());
           }
         } else if (status.equals(HostRoleStatus.PENDING)) {
           //Need to schedule first time
           commandsToSchedule.add(c);
+          LOG.trace("===>commandsToSchedule(first_time)=" + 
commandsToSchedule.size());
         }
 
         this.updateRoleStats(status, roleStats.get(roleStr));
       }
     }
+    LOG.debug("Collected {} commands to schedule in this wakeup.", 
commandsToSchedule.size());
     return roleStats;
   }
 
@@ -581,6 +641,53 @@ class ActionScheduler implements Runnable {
     return false;
   }
 
+  private ListMultimap<String, ServiceComponentHostEvent> formEventMap(Stage 
s, List<ExecutionCommand> commands) {
+    ListMultimap<String, ServiceComponentHostEvent> serviceEventMap = 
ArrayListMultimap.create();
+    for (ExecutionCommand cmd : commands) {
+      String hostname = cmd.getHostname();
+      String roleStr = cmd.getRole();
+      if (RoleCommand.ACTIONEXECUTE != cmd.getRoleCommand()) {
+          serviceEventMap.put(cmd.getServiceName(), s.getFsmEvent(hostname, 
roleStr).getEvent());
+      }
+    }
+    return serviceEventMap;
+  }
+
+  private void processHostRole(Stage s, ExecutionCommand cmd, 
List<ExecutionCommand> commandsToStart,
+                               List<ExecutionCommand> commandsToUpdate)
+    throws AmbariException {
+    long now = System.currentTimeMillis();
+    String roleStr = cmd.getRole();
+    String hostname = cmd.getHostname();
+
+    // start time is -1 if host role command is not started yet
+    if (s.getStartTime(hostname, roleStr) < 0) {
+
+      commandsToStart.add(cmd);
+      s.setStartTime(hostname,roleStr, now);
+      s.setHostRoleStatus(hostname, roleStr, HostRoleStatus.QUEUED);
+    }
+    s.setLastAttemptTime(hostname, roleStr, now);
+    s.incrementAttemptCount(hostname, roleStr);
+    /** change the hostname in the command for the host itself **/
+    cmd.setHostname(hostsMap.getHostMap(hostname));
+
+
+    //Try to get clusterHostInfo from cache
+    String stagePk = s.getStageId() + "-" + s.getRequestId();
+    Map<String, Set<String>> clusterHostInfo = 
clusterHostInfoCache.getIfPresent(stagePk);
+
+    if (clusterHostInfo == null) {
+      Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
+      clusterHostInfo = StageUtils.getGson().fromJson(s.getClusterHostInfo(), 
type);
+      clusterHostInfoCache.put(stagePk, clusterHostInfo);
+    }
+
+    cmd.setClusterHostInfo(clusterHostInfo);
+
+    commandsToUpdate.add(cmd);
+  }
+
   private void scheduleHostRole(Stage s, ExecutionCommand cmd)
       throws InvalidStateTransitionException, AmbariException {
     long now = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/ambari/blob/949ecd21/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index a80327c..1297f6c 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -1814,6 +1814,7 @@ public class AmbariManagementControllerImpl implements 
AmbariManagementControlle
     List<Stage> stages = doStageCreation(requestStages, cluster, 
changedServices, changedComponents,
         changedHosts, requestParameters, requestProperties,
         runSmokeTest, reconfigureClients);
+    LOG.debug("Created {} stages", ((stages != null) ? stages.size() : 0));
 
     requestStages.addStages(stages);
     updateServiceStates(changedServices, changedComponents, changedHosts, 
ignoredHosts);

http://git-wip-us.apache.org/repos/asf/ambari/blob/949ecd21/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
index fc1ec99..18f9d6b 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
@@ -318,6 +318,7 @@ public class RequestResourceProvider extends 
AbstractControllerResourceProvider
       List<Long> requestIds = actionManager.getRequestsByStatus(status,
         maxResults != null ? maxResults : BaseRequest.DEFAULT_PAGE_SIZE,
         ascOrder != null ? ascOrder : false);
+      LOG.debug("List<Long> requestIds = actionManager.getRequestsByStatus = 
{}", requestIds.size());
 
       response.addAll(getRequestResources(clusterName, actionManager, 
requestIds,
           requestedPropertyIds));
@@ -340,6 +341,7 @@ public class RequestResourceProvider extends 
AbstractControllerResourceProvider
                                                    Set<String> 
requestedPropertyIds) {
 
     List<org.apache.ambari.server.actionmanager.Request> requests = 
actionManager.getRequests(requestIds);
+    LOG.debug("requests = actionManager.getRequests(requestIds)={}", 
requests.size());
 
     Map<Long, Resource> resourceMap = new HashMap<Long, Resource>();
 
@@ -417,6 +419,11 @@ public class RequestResourceProvider extends 
AbstractControllerResourceProvider
 
     int inProgressTaskCount = taskCount - completedTaskCount - queuedTaskCount 
- pendingTaskCount;
 
+    LOG.debug("taskCount={}, inProgressTaskCount={}, completedTaskCount={}, 
queuedTaskCount={}, " +
+      "pendingTaskCount={}, failedTaskCount={}, abortedTaskCount={}, 
timedOutTaskCount={}",
+      taskCount, inProgressTaskCount, completedTaskCount, queuedTaskCount, 
pendingTaskCount,
+      failedTaskCount, abortedTaskCount, timedOutTaskCount);
+
     // determine request status
     HostRoleStatus requestStatus = failedTaskCount > 0 ? HostRoleStatus.FAILED 
:
         abortedTaskCount > 0 ? HostRoleStatus.ABORTED :

http://git-wip-us.apache.org/repos/asf/ambari/blob/949ecd21/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java 
b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
index c8460ef..e1a3e42 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
@@ -23,9 +23,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.ReadWriteLock;
 
+import com.google.common.collect.ListMultimap;
+import com.google.inject.persist.Transactional;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.controller.ClusterResponse;
 import org.apache.ambari.server.state.configgroup.ConfigGroup;
+import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import org.apache.ambari.server.state.scheduler.RequestExecution;
 
 public interface Cluster {
@@ -293,4 +296,12 @@ public interface Cluster {
    * @throws AmbariException
    */
   public void deleteRequestExecution(Long id) throws AmbariException;
+
+  /**
+   * Bulk handle service component host events
+   *
+   * @param eventMap serviceName - event mapping
+   * @return list of failed events
+   */
+  List<ServiceComponentHostEvent> 
processServiceComponentHostEvents(ListMultimap<String, 
ServiceComponentHostEvent> eventMap);
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/949ecd21/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index f2020d1..706c375 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.persistence.RollbackException;
 
+import com.google.common.collect.ListMultimap;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.ServiceComponentHostNotFoundException;
 import org.apache.ambari.server.ServiceNotFoundException;
@@ -65,12 +66,14 @@ import org.apache.ambari.server.state.MaintenanceState;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceComponentHostEvent;
 import org.apache.ambari.server.state.ServiceFactory;
 import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.State;
 import org.apache.ambari.server.state.ClusterHealthReport;
 import org.apache.ambari.server.state.configgroup.ConfigGroup;
 import org.apache.ambari.server.state.configgroup.ConfigGroupFactory;
+import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import org.apache.ambari.server.state.scheduler.RequestExecution;
 import org.apache.ambari.server.state.scheduler.RequestExecutionFactory;
 import org.slf4j.Logger;
@@ -1426,6 +1429,36 @@ public class ClusterImpl implements Cluster {
     return getHostsDesiredConfigs(hostnames);
   }
 
+  @Transactional
+  @Override
+  public List<ServiceComponentHostEvent> 
processServiceComponentHostEvents(ListMultimap<String, 
ServiceComponentHostEvent> eventMap) {
+    List<ServiceComponentHostEvent> failedEvents = new 
ArrayList<ServiceComponentHostEvent>();
+
+    clusterGlobalLock.readLock().lock();
+    try {
+      for (Entry<String, ServiceComponentHostEvent> entry : 
eventMap.entries()) {
+        String serviceName = entry.getKey();
+        ServiceComponentHostEvent event = entry.getValue();
+        try {
+          Service service = getService(serviceName);
+          ServiceComponent serviceComponent = 
service.getServiceComponent(event.getServiceComponentName());
+          ServiceComponentHost serviceComponentHost = 
serviceComponent.getServiceComponentHost(event.getHostName());
+          serviceComponentHost.handleEvent(event);
+        } catch (AmbariException e) {
+          LOG.error("ServiceComponentHost lookup exception ", e);
+          failedEvents.add(event);
+        } catch (InvalidStateTransitionException e) {
+          LOG.error("Invalid transition ", e);
+          failedEvents.add(event);
+        }
+      }
+    } finally {
+      clusterGlobalLock.readLock().unlock();
+    }
+
+    return failedEvents;
+  }
+
   private ClusterHealthReport getClusterHealthReport() throws AmbariException {
 
     int staleConfigsHosts = 0;

Reply via email to