Repository: ambari Updated Branches: refs/heads/branch-2.1 7dbd00050 -> 48c9c7412
AMBARI-13550. ActionScheduler#filterParallelPerHostStages should not filter out stages with server-side actions (rlevas) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/48c9c741 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/48c9c741 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/48c9c741 Branch: refs/heads/branch-2.1 Commit: 48c9c7412c12e88223d343800b3927390f76bd6f Parents: 7dbd000 Author: Robert Levas <[email protected]> Authored: Wed Oct 28 14:14:39 2015 -0400 Committer: Robert Levas <[email protected]> Committed: Wed Oct 28 14:14:49 2015 -0400 ---------------------------------------------------------------------- .../server/actionmanager/ActionScheduler.java | 90 +++++++++++++++++-- .../ambari/server/actionmanager/Stage.java | 2 +- .../actionmanager/TestActionScheduler.java | 92 +++++++++++++++++++- 3 files changed, 170 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/48c9c741/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java index 7b5adca..982bb15 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java @@ -429,25 +429,97 @@ class ActionScheduler implements Runnable { } /** - * Returns filtered list of stages following the rule: - * 1) remove stages that has the same host. Leave only first stage, the rest that have same host of any operation will be filtered - * 2) do not remove stages intersected by host if they have intersection by background command - * @param stages - * @return + * Returns filtered list of stages such that the returned list is an ordered list of stages that may + * be executed in parallel or in the order in which they are presented + * <p/> + * Assumption: the list of stages supplied as input are ordered by request id and then stage id. + * <p/> + * Rules: + * <ul> + * <li> + * Stages are filtered such that the first stage in the list (assumed to be the first pending + * stage from the earliest active request) has priority + * </li> + * <li> + * No stage in any request may be executed before an earlier stage in the same request + * </li> + * <li> + * A stages in different requests may be performed in parallel if the relevant hosts for the + * stage in the later requests do not intersect with the union of hosts from (pending) stages + * in earlier requests + * </li> + * </ul> + * + * @param stages the stages to process + * @return a list of stages that may be executed in parallel */ private List<Stage> filterParallelPerHostStages(List<Stage> stages) { List<Stage> retVal = new ArrayList<Stage>(); Set<String> affectedHosts = new HashSet<String>(); - for(Stage s : stages){ + Set<Long> affectedRequests = new HashSet<Long>(); + + for (Stage s : stages) { + long requestId = s.getRequestId(); + + if (LOG.isTraceEnabled()) { + LOG.trace("==> Processing stage: {}/{} ({}) for {}", requestId, s.getStageId(), s.getRequestContext()); + } + + boolean addStage = true; + + // Iterate over the relevant hosts for this stage to see if any intersect with the set of + // hosts needed for previous stages. If any intersection occurs, this stage may not be + // executed in parallel. for (String host : s.getHosts()) { - if (!affectedHosts.contains(host)) { - if(!isStageHasBackgroundCommandsOnly(s, host)){ + LOG.trace("===> Processing Host {}", host); + + if (affectedHosts.contains(host)) { + if (LOG.isTraceEnabled()) { + LOG.trace("===> Skipping stage since it utilizes at least one host that a previous stage requires: {}/{} ({})", s.getRequestId(), s.getStageId(), s.getRequestContext()); + } + + addStage &= false; + } else { + if (!Stage.INTERNAL_HOSTNAME.equalsIgnoreCase(host) && !isStageHasBackgroundCommandsOnly(s, host)) { + LOG.trace("====> Adding host to affected hosts: {}", host); affectedHosts.add(host); } - retVal.add(s); + + addStage &= true; } } + + // If this stage is for a request that we have already processed, the it cannot execute in + // parallel since only one stage per request my execute at a time. The first time we encounter + // a request id, will be for the first pending stage for that request, so it is a candidate + // for execution at this time - if the previous test for host intersection succeeds. + if (affectedRequests.contains(requestId)) { + if (LOG.isTraceEnabled()) { + LOG.trace("===> Skipping stage since the request it is in has been processed already: {}/{} ({})", s.getRequestId(), s.getStageId(), s.getRequestContext()); + } + + addStage = false; + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("====> Adding request to affected requests: {}", requestId); + } + + affectedRequests.add(requestId); + addStage &= true; + } + + // If both tests pass - the stage is the first pending stage in its request and the hosts + // required in the stage do not intersect with hosts from stages that should occur before this, + // than add it to the list of stages that may be executed in parallel. + if (addStage) { + if (LOG.isTraceEnabled()) { + LOG.trace("===> Adding stage to return value: {}/{} ({})", s.getRequestId(), s.getStageId(), s.getRequestContext()); + } + + retVal.add(s); + } } + return retVal; } http://git-wip-us.apache.org/repos/asf/ambari/blob/48c9c741/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java index 8b2703c..ef50963 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java @@ -63,7 +63,7 @@ public class Stage { * don't want stages getting confused with Ambari vs cluster hosts, so * don't use {@link StageUtils#getHostName()} */ - private static final String INTERNAL_HOSTNAME = "_internal_ambari"; + public static final String INTERNAL_HOSTNAME = "_internal_ambari"; private static Logger LOG = LoggerFactory.getLogger(Stage.class); private final long requestId; http://git-wip-us.apache.org/repos/asf/ambari/blob/48c9c741/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java index f8f9ce9..bc4d397 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java @@ -636,6 +636,79 @@ public class TestActionScheduler { } /** + * Test server actions in multiple requests. + * + * This is used to make sure the server-side actions do not get filtered out from + * {@link org.apache.ambari.server.actionmanager.ActionScheduler#filterParallelPerHostStages(java.util.List)} + */ + @Test + public void testServerActionInMultipleRequests() throws Exception { + ActionQueue aq = new ActionQueue(); + Clusters fsm = mock(Clusters.class); + Cluster oneClusterMock = mock(Cluster.class); + Service serviceObj = mock(Service.class); + ServiceComponent scomp = mock(ServiceComponent.class); + ServiceComponentHost sch = mock(ServiceComponentHost.class); + UnitOfWork unitOfWork = mock(UnitOfWork.class); + when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); + when(oneClusterMock.getService(anyString())).thenReturn(serviceObj); + when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); + when(scomp.getServiceComponentHost(anyString())).thenReturn(sch); + when(serviceObj.getCluster()).thenReturn(oneClusterMock); + + String clusterName = "cluster1"; + String hostname1 = "ahost.ambari.apache.org"; + String hostname2 = "bhost.ambari.apache.org"; + HashMap<String, ServiceComponentHost> hosts = + new HashMap<String, ServiceComponentHost>(); + hosts.put(hostname1, sch); + hosts.put(hostname2, sch); + hosts.put(Stage.INTERNAL_HOSTNAME, sch); + when(scomp.getServiceComponentHosts()).thenReturn(hosts); + + List<Stage> stages = new ArrayList<Stage>(); + Stage stage01 = createStage(clusterName, 0, 1); + addTask(stage01, Stage.INTERNAL_HOSTNAME, clusterName, Role.AMBARI_SERVER_ACTION, RoleCommand.ACTIONEXECUTE, "AMBARI", 1); + + Stage stage11 = createStage("cluster1", 1, 1); + addTask(stage11, hostname1, clusterName, Role.KERBEROS_CLIENT, RoleCommand.CUSTOM_COMMAND, "KERBEROS", 2); + + Stage stage02 = createStage("cluster1", 0, 2); + addTask(stage02, Stage.INTERNAL_HOSTNAME, clusterName, Role.AMBARI_SERVER_ACTION, RoleCommand.ACTIONEXECUTE, "AMBARI", 3); + + Stage stage12 = createStage("cluster1", 1, 2); + addTask(stage12, hostname2, clusterName, Role.KERBEROS_CLIENT, RoleCommand.CUSTOM_COMMAND, "KERBEROS", 4); + + stages.add(stage01); + stages.add(stage11); + stages.add(stage02); + stages.add(stage12); + + ActionDBAccessor db = mock(ActionDBAccessor.class); + + RequestEntity request = mock(RequestEntity.class); + when(request.isExclusive()).thenReturn(false); + when(db.getRequestEntity(anyLong())).thenReturn(request); + + when(db.getCommandsInProgressCount()).thenReturn(stages.size()); + when(db.getStagesInProgress()).thenReturn(stages); + + Properties properties = new Properties(); + properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "true"); + Configuration conf = new Configuration(properties); + ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, + new HostsMap((String) null), + unitOfWork, EasyMock.createNiceMock(AmbariEventPublisher.class), conf); + + scheduler.doWork(); + + Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(Stage.INTERNAL_HOSTNAME, Role.AMBARI_SERVER_ACTION.name())); + Assert.assertEquals(HostRoleStatus.PENDING, stages.get(1).getHostRoleStatus(hostname1, Role.KERBEROS_CLIENT.name())); + Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(2).getHostRoleStatus(Stage.INTERNAL_HOSTNAME, Role.AMBARI_SERVER_ACTION.name())); + Assert.assertEquals(HostRoleStatus.PENDING, stages.get(3).getHostRoleStatus(hostname2, Role.KERBEROS_CLIENT.name())); + } + + /** * Test server action */ @Test @@ -1560,21 +1633,32 @@ public class TestActionScheduler { return report; } - private Stage getStageWithSingleTask(String hostname, String clusterName, Role role, - RoleCommand roleCommand, Service.Type service, int taskId, - int stageId, int requestId) { + private Stage createStage(String clusterName, int stageId, int requestId) { Stage stage = stageFactory.createNew(requestId, "/tmp", clusterName, 1L, "getStageWithSingleTask", CLUSTER_HOST_INFO, "{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}"); stage.setStageId(stageId); + return stage; + } + + private Stage addTask(Stage stage, String hostname, String clusterName, Role role, + RoleCommand roleCommand, String serviceName, int taskId) { stage.addHostRoleExecutionCommand(hostname, role, roleCommand, new ServiceComponentHostUpgradeEvent(role.toString(), hostname, System.currentTimeMillis(), "HDP-0.2"), - clusterName, service.toString(), false, false); + clusterName, serviceName, false, false); stage.getExecutionCommandWrapper(hostname, role.toString()).getExecutionCommand(); stage.getOrderedHostRoleCommands().get(0).setTaskId(taskId); return stage; } + private Stage getStageWithSingleTask(String hostname, String clusterName, Role role, + RoleCommand roleCommand, Service.Type service, int taskId, + int stageId, int requestId) { + Stage stage = createStage(clusterName, stageId, requestId); + return addTask(stage, hostname, clusterName, role, roleCommand, service.name(), taskId); + } + + private Stage getStageWithSingleTask(String hostname, String clusterName, Role role, RoleCommand roleCommand, String customCommandName, Service.Type service, int taskId, int stageId, int requestId) { Stage stage = getStageWithSingleTask(hostname, clusterName, role, roleCommand, service, taskId, stageId, requestId);
