This is an automated email from the ASF dual-hosted git repository. ncole pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push: new 60d3134 [AMBARI-24909] Parallel Client Fixes for Server Side Tasks (#2618) 60d3134 is described below commit 60d313463ce52c8df5c2268491a70c8cd38eb6a8 Author: ncole <nc...@hortonworks.com> AuthorDate: Thu Nov 15 15:47:28 2018 -0500 [AMBARI-24909] Parallel Client Fixes for Server Side Tasks (#2618) --- .../resource_management/libraries/script/script.py | 2 +- .../orchestrate/ParallelClientGroupingBuilder.java | 136 +++++++++++++++------ 2 files changed, 102 insertions(+), 36 deletions(-) diff --git a/ambari-common/src/main/python/resource_management/libraries/script/script.py b/ambari-common/src/main/python/resource_management/libraries/script/script.py index 67bfca1..76893fd 100644 --- a/ambari-common/src/main/python/resource_management/libraries/script/script.py +++ b/ambari-common/src/main/python/resource_management/libraries/script/script.py @@ -1056,7 +1056,7 @@ class Script(object): if self.should_expose_component_version("restart"): self.save_component_version_to_structured_out("restart") - def post_upgrade_restart(env, upgrade_type=None): + def post_upgrade_restart(self, env, upgrade_type=None): """ To be overridden by subclasses """ diff --git a/ambari-server/src/main/java/org/apache/ambari/server/stack/upgrade/orchestrate/ParallelClientGroupingBuilder.java b/ambari-server/src/main/java/org/apache/ambari/server/stack/upgrade/orchestrate/ParallelClientGroupingBuilder.java index 9411995..0ed9931 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/stack/upgrade/orchestrate/ParallelClientGroupingBuilder.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/stack/upgrade/orchestrate/ParallelClientGroupingBuilder.java @@ -60,6 +60,12 @@ public class ParallelClientGroupingBuilder extends StageWrapperBuilder { return; } + Task task = resolveTask(upgradeContext, pc); + // !!! better not + if (null == task) { + return; + } + Iterator<String> hostIterator = hostsType.getHosts().iterator(); HostHolder holder = new HostHolder(); holder.m_firstHost = hostIterator.next(); @@ -68,11 +74,10 @@ public class ParallelClientGroupingBuilder extends StageWrapperBuilder { holder.m_remainingHosts.add(hostIterator.next()); } - holder.m_component = pc; - holder.m_tasks = new ArrayList<>(); - holder.m_tasks.addAll(resolveTasks(upgradeContext, true, pc)); - holder.m_tasks.add(resolveTask(upgradeContext, pc)); - holder.m_tasks.addAll(resolveTasks(upgradeContext, false, pc)); + holder.m_component = pc.name; + holder.m_tasks = Collections.singletonList(task); + holder.m_preTasks = resolveTasks(upgradeContext, true, pc); + holder.m_postTasks = resolveTasks(upgradeContext, false, pc); serviceToHostMap.put(service, holder); } @@ -91,36 +96,40 @@ public class ParallelClientGroupingBuilder extends StageWrapperBuilder { // !!! create a stage wrapper for the service check on the first host // !!! create a stage wrapper for the remaining hosts - serviceToHostMap.forEach((service, holder) -> { - String component = holder.m_component.name; + // !!! when building stages, the pre- and post- tasks should be run sequentially, + // which means they should be in their own stages. that may seem counter-intuitive, + // but the parallelism is across hosts, not stages. - List<TaskWrapper> wrappers = buildWrappers(service, component, holder.m_tasks, - Collections.singleton(holder.m_firstHost), true); + serviceToHostMap.forEach((service, holder) -> { - String text = getStageText("Upgrading", - upgradeContext.getComponentDisplay(service, component), - Collections.singleton(holder.m_firstHost)); + // !!! pre-tasks for the first host + starterUpgrades.addAll(buildStageWrappers(upgradeContext, service, holder, + holder.m_preTasks, true, "Preparing")); - // !!! this is a poor assumption - StageWrapper.Type type = wrappers.get(0).getTasks().get(0).getStageWrapperType(); + // !!! upgrades for the first host + starterUpgrades.addAll(buildStageWrappers(upgradeContext, service, holder, + holder.m_tasks, true, "Upgrading")); - StageWrapper stage = new StageWrapper(type, text, new HashMap<>(), wrappers); + // !!! post tasks for the first host + starterUpgrades.addAll(buildStageWrappers(upgradeContext, service, holder, + holder.m_postTasks, true, "Completing")); - // !!! force the service check on the first host + // !!! service check for the first host StageWrapper serviceCheck = new ServiceCheckStageWrapper(service, upgradeContext.getServiceDisplay(service), false, holder.m_firstHost); - - starterUpgrades.add(stage); starterUpgrades.add(serviceCheck); - wrappers = buildWrappers(service, component, holder.m_tasks, holder.m_remainingHosts, false); + // !!! pre-tasks for remaining hosts + finisherUpgrades.addAll(buildStageWrappers(upgradeContext, service, holder, + holder.m_preTasks, false, "Prepare Remaining")); - text = getStageText("Upgrade Remaining", - upgradeContext.getComponentDisplay(service, component), - holder.m_remainingHosts); - stage = new StageWrapper(type, text, new HashMap<>(), wrappers); + // !!! upgrades for the remaining hosts + finisherUpgrades.addAll(buildStageWrappers(upgradeContext, service, holder, + holder.m_tasks, false, "Upgrade Remaining")); - finisherUpgrades.add(stage); + // !!! post tasks for the remaining hosts + finisherUpgrades.addAll(buildStageWrappers(upgradeContext, service, holder, + holder.m_postTasks, false, "Complete Remaining")); }); List<StageWrapper> results = new ArrayList<>(stageWrappers); @@ -132,27 +141,81 @@ public class ParallelClientGroupingBuilder extends StageWrapperBuilder { } /** + * Builds the stages for the components + * @param upgradeContext + * the upgrade context + * @param service + * the service name + * @param holder + * the holder of component and hosts + * @param tasks + * the tasks to wrap + * @param firstHost + * {@code true} if wrapping for the first host + * @param prefix + * the text prefix for the stage + * @return + * the list of stage wrappers + */ + private List<StageWrapper> buildStageWrappers(UpgradeContext upgradeContext, String service, + HostHolder holder, List<Task> tasks, boolean firstHost, String prefix) { + + if (CollectionUtils.isEmpty(tasks)) { + return Collections.emptyList(); + } + + Set<String> hosts = firstHost ? Collections.singleton(holder.m_firstHost) : + holder.m_remainingHosts; + + String component = holder.m_component; + String componentDisplay = upgradeContext.getComponentDisplay(service, component); + String text = getStageText(prefix, componentDisplay, hosts); + + List<TaskWrapper> wrappers = buildTaskWrappers(service, component, tasks, hosts, firstHost); + + List<StageWrapper> results = new ArrayList<>(); + + wrappers.forEach(task -> { + // !!! there should only be one task per wrapper, so this assumption is ok for now + StageWrapper.Type type = task.getTasks().get(0).getStageWrapperType(); + + StageWrapper stage = new StageWrapper(type, text, new HashMap<>(), + Collections.singletonList(task)); + results.add(stage); + }); + + return results; + } + + /** * Build the wrappers for the tasks. * + * @param service + * the service name + * @param component + * the component name * @param tasks * the tasks to wrap * @param hosts * the hosts where the tasks should run + * @param firstHost + * {@code true} if these wrappers are for the first host * @return + * the list if task wrappers */ - private List<TaskWrapper> buildWrappers(String service, String component, + private List<TaskWrapper> buildTaskWrappers(String service, String component, List<Task> tasks, Set<String> hosts, boolean firstHost) { List<TaskWrapper> results = new ArrayList<>(); - String ambariServerHostname = StageUtils.getHostName(); - - for (Task task : tasks) { - - // only add the server-side task if there are actual hosts for the service/component + tasks.forEach(task -> { + // server side actions should run only run once (the first host) if (task.getType().isServerAction()) { - results.add(new TaskWrapper(service, component, Collections.singleton(ambariServerHostname), task)); - continue; + if (firstHost) { + String ambariServerHostname = StageUtils.getHostName(); + results.add(new TaskWrapper(service, component, Collections.singleton(ambariServerHostname), task)); + } + return; } // FIXME how to handle master-only types @@ -163,12 +226,12 @@ public class ParallelClientGroupingBuilder extends StageWrapperBuilder { // !!! singular types have already run when firstHost is true if (et.hosts != ExecuteHostType.ALL) { - continue; + return; } } results.add(new TaskWrapper(service, component, hosts, task)); - } + }); return results; } @@ -177,10 +240,13 @@ public class ParallelClientGroupingBuilder extends StageWrapperBuilder { * Temporary holder for building stage wrappers */ private static class HostHolder { - private ProcessingComponent m_component; + private String m_component; private String m_firstHost; private Set<String> m_remainingHosts = new HashSet<>(); + // !!! there will only ever be one, but code is cleaner this way private List<Task> m_tasks; + private List<Task> m_preTasks; + private List<Task> m_postTasks; } }