AMBARI-20640. Upgrade server-side actions should be performed only one time per group (ncole)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ad118045 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ad118045 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ad118045 Branch: refs/heads/branch-feature-AMBARI-12556 Commit: ad118045afd5381b7d028294faaa1b8fc3c41b8e Parents: f8827fe Author: Nate Cole <[email protected]> Authored: Thu Mar 30 20:53:17 2017 -0400 Committer: Nate Cole <[email protected]> Committed: Mon Apr 3 10:25:43 2017 -0400 ---------------------------------------------------------------------- .../state/stack/upgrade/ColocatedGrouping.java | 80 +++++++- .../state/stack/upgrade/TaskWrapperBuilder.java | 10 +- .../AmbariManagementControllerTest.java | 2 +- .../ambari/server/state/UpgradeHelperTest.java | 188 ++++++++++++++++++- .../upgrades/upgrade_multi_server_tasks.xml | 88 +++++++++ 5 files changed, 352 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/ad118045/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java index c939320..272264f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java @@ -18,6 +18,7 @@ package org.apache.ambari.server.state.stack.upgrade; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -35,10 +36,14 @@ import org.apache.ambari.server.stack.HostsType; import org.apache.ambari.server.state.UpgradeContext; import org.apache.ambari.server.state.stack.UpgradePack.ProcessingComponent; import org.apache.ambari.server.state.stack.upgrade.StageWrapper.Type; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; import com.google.gson.JsonArray; import com.google.gson.JsonObject; import com.google.gson.JsonPrimitive; @@ -95,6 +100,7 @@ public class ColocatedGrouping extends Grouping { Map<String, List<TaskProxy>> targetMap = ((i++) < count) ? initialBatch : finalBatches; List<TaskProxy> targetList = targetMap.get(host); + if (null == targetList) { targetList = new ArrayList<>(); targetMap.put(host, targetList); @@ -160,8 +166,26 @@ public class ColocatedGrouping extends Grouping { * {@inheritDoc} */ @Override - public List<StageWrapper> build(UpgradeContext upgradeContext, - List<StageWrapper> stageWrappers) { + public List<StageWrapper> build(UpgradeContext upgradeContext, List<StageWrapper> stageWrappers) { + + final List<Task> visitedServerSideTasks = new ArrayList<>(); + + // !!! predicate to ensure server-side tasks are executed once only per grouping + Predicate<Task> predicate = new Predicate<Task>() { + @Override + public boolean apply(Task input) { + if (visitedServerSideTasks.contains(input)) { + return false; + } + + if (input.getType().isServerAction()) { + visitedServerSideTasks.add(input); + } + + return true; + }; + }; + List<StageWrapper> results = new ArrayList<>(stageWrappers); if (LOG.isDebugEnabled()) { @@ -169,7 +193,7 @@ public class ColocatedGrouping extends Grouping { LOG.debug("RU final: {}", finalBatches); } - List<StageWrapper> befores = fromProxies(upgradeContext.getDirection(), initialBatch); + List<StageWrapper> befores = fromProxies(upgradeContext.getDirection(), initialBatch, predicate); results.addAll(befores); if (!befores.isEmpty()) { @@ -189,13 +213,14 @@ public class ColocatedGrouping extends Grouping { results.add(wrapper); } - results.addAll(fromProxies(upgradeContext.getDirection(), finalBatches)); + results.addAll(fromProxies(upgradeContext.getDirection(), finalBatches, predicate)); return results; } private List<StageWrapper> fromProxies(Direction direction, - Map<String, List<TaskProxy>> wrappers) { + Map<String, List<TaskProxy>> wrappers, Predicate<Task> predicate) { + List<StageWrapper> results = new ArrayList<>(); Set<String> serviceChecks = new HashSet<>(); @@ -213,10 +238,27 @@ public class ColocatedGrouping extends Grouping { if (!t.restart) { if (null == wrapper) { - wrapper = new StageWrapper(t.type, t.message, t.getTasksArray()); + TaskWrapper[] tasks = t.getTasksArray(predicate); + + if (LOG.isDebugEnabled()) { + for (TaskWrapper tw : tasks) { + LOG.debug("{}", tw); + } + } + + if (ArrayUtils.isNotEmpty(tasks)) { + wrapper = new StageWrapper(t.type, t.message, tasks); + } } } else { - execwrappers.add(new StageWrapper(StageWrapper.Type.RESTART, t.message, t.getTasksArray())); + TaskWrapper[] tasks = t.getTasksArray(null); + + if (LOG.isDebugEnabled()) { + for (TaskWrapper tw : tasks) { + LOG.debug("{}", tw); + } + } + execwrappers.add(new StageWrapper(StageWrapper.Type.RESTART, t.message, tasks)); } } @@ -345,8 +387,28 @@ public class ColocatedGrouping extends Grouping { return s; } - private TaskWrapper[] getTasksArray() { - return tasks.toArray(new TaskWrapper[0]); + /** + * Get the task wrappers for this proxy. Server-side tasks cannot be executed more than + * one time per grouping. + * @param predicate the predicate to determine if a server-side task has already been added to a wrapper. + * @return the wrappers for a stage + */ + private TaskWrapper[] getTasksArray(Predicate<Task> predicate) { + if (null == predicate) { + return tasks.toArray(new TaskWrapper[tasks.size()]); + } + + List<TaskWrapper> interim = new ArrayList<>(); + + for (TaskWrapper wrapper : tasks) { + Collection<Task> filtered = Collections2.filter(wrapper.getTasks(), predicate); + + if (CollectionUtils.isNotEmpty(filtered)) { + interim.add(wrapper); + } + } + + return interim.toArray(new TaskWrapper[interim.size()]); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/ad118045/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java index bd2bf14..a75fe00 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java @@ -27,6 +27,7 @@ import java.util.Set; import org.apache.ambari.server.stack.HostsType; import org.apache.ambari.server.utils.StageUtils; +import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,11 +55,10 @@ public class TaskWrapperBuilder { List<TaskWrapper> collection = new ArrayList<>(); for (Task t : tasks) { - if (t.getType().equals(Task.Type.CONFIGURE) || t.getType().equals(Task.Type.MANUAL)) { - // only add the CONFIGURE/MANUAL task if there are actual hosts for the service/component - if (null != hostsType.hosts && !hostsType.hosts.isEmpty()) { - collection.add(new TaskWrapper(service, component, Collections.singleton(ambariServerHostname), params, t)); - } + + // only add the server-side task if there are actual hosts for the service/component + if (t.getType().isServerAction() && CollectionUtils.isNotEmpty(hostsType.hosts)) { + collection.add(new TaskWrapper(service, component, Collections.singleton(ambariServerHostname), params, t)); continue; } http://git-wip-us.apache.org/repos/asf/ambari/blob/ad118045/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java index 483880a..554e089 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java @@ -7310,7 +7310,7 @@ public class AmbariManagementControllerTest { Assert.assertEquals(1, responsesWithParams.size()); StackVersionResponse resp = responsesWithParams.iterator().next(); assertNotNull(resp.getUpgradePacks()); - assertEquals(12, resp.getUpgradePacks().size()); + assertEquals(13, resp.getUpgradePacks().size()); assertTrue(resp.getUpgradePacks().contains("upgrade_test")); } http://git-wip-us.apache.org/repos/asf/ambari/blob/ad118045/ambari-server/src/test/java/org/apache/ambari/server/state/UpgradeHelperTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/UpgradeHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/UpgradeHelperTest.java index 8e5ad0a..0dd7f58 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/UpgradeHelperTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/UpgradeHelperTest.java @@ -601,7 +601,7 @@ public class UpgradeHelperTest { assertEquals(4, groups.get(0).items.size()); assertEquals(8, groups.get(1).items.size()); assertEquals(5, groups.get(2).items.size()); - assertEquals(8, groups.get(3).items.size()); + assertEquals(7, groups.get(3).items.size()); assertEquals(8, groups.get(4).items.size()); } @@ -943,6 +943,7 @@ public class UpgradeHelperTest { Map<String, String> hiveConfigs = new HashMap<>(); hiveConfigs.put("fooKey", "THIS-BETTER-CHANGE"); hiveConfigs.put("ifFooKey", "ifFooValue"); + ConfigurationRequest configurationRequest = new ConfigurationRequest(); configurationRequest.setClusterName(cluster.getClusterName()); configurationRequest.setType("hive-site"); @@ -1870,6 +1871,191 @@ public class UpgradeHelperTest { assertTrue(groups.isEmpty()); } + @Test + public void testMultipleServerTasks() throws Exception { + + // !!! make a two node cluster with just ZK + Clusters clusters = injector.getInstance(Clusters.class); + ServiceFactory serviceFactory = injector.getInstance(ServiceFactory.class); + + String clusterName = "c1"; + + StackId stackId = new StackId("HDP-2.1.1"); + StackId stackId2 = new StackId("HDP-2.2.0"); + + clusters.addCluster(clusterName, stackId); + Cluster c = clusters.getCluster(clusterName); + + helper.getOrCreateRepositoryVersion(stackId, + c.getDesiredStackVersion().getStackVersion()); + + helper.getOrCreateRepositoryVersion(stackId2,"2.2.0"); + + helper.getOrCreateRepositoryVersion(stackId2, UPGRADE_VERSION); + + c.createClusterVersion(stackId, + c.getDesiredStackVersion().getStackVersion(), "admin", + RepositoryVersionState.INSTALLING); + + for (int i = 0; i < 2; i++) { + String hostName = "h" + (i+1); + clusters.addHost(hostName); + Host host = clusters.getHost(hostName); + + Map<String, String> hostAttributes = new HashMap<>(); + hostAttributes.put("os_family", "redhat"); + hostAttributes.put("os_release_version", "6"); + host.setHostAttributes(hostAttributes); + + clusters.mapHostToCluster(hostName, clusterName); + } + + // !!! add services + c.addService(serviceFactory.createNew(c, "ZOOKEEPER")); + + Service s = c.getService("ZOOKEEPER"); + ServiceComponent sc = s.addServiceComponent("ZOOKEEPER_SERVER"); + sc.addServiceComponentHost("h1"); + sc.addServiceComponentHost("h2"); + + sc = s.addServiceComponent("ZOOKEEPER_CLIENT"); + sc.addServiceComponentHost("h1"); + sc.addServiceComponentHost("h2"); + + EasyMock.reset(m_masterHostResolver); + + expect(m_masterHostResolver.getCluster()).andReturn(c).anyTimes(); + + HostsType type = new HostsType(); + type.hosts.addAll(Arrays.asList("h1", "h2")); + expect(m_masterHostResolver.getMasterAndHosts("ZOOKEEPER", "ZOOKEEPER_SERVER")).andReturn(type).anyTimes(); + + type = new HostsType(); + type.hosts.addAll(Arrays.asList("h1", "h2")); + expect(m_masterHostResolver.getMasterAndHosts("ZOOKEEPER", "ZOOKEEPER_CLIENT")).andReturn(type).anyTimes(); + + + replay(m_masterHostResolver); + + Map<String, UpgradePack> upgrades = ambariMetaInfo.getUpgradePacks("HDP", "2.1.1"); + + ServiceInfo si = ambariMetaInfo.getService("HDP", "2.1.1", "ZOOKEEPER"); + si.setDisplayName("Zk"); + ComponentInfo ci = si.getComponentByName("ZOOKEEPER_SERVER"); + ci.setDisplayName("ZooKeeper1 Server2"); + + UpgradePack upgrade = upgrades.get("upgrade_multi_server_tasks"); + assertNotNull(upgrade); + + UpgradeContext context = m_upgradeContextFactory.create(c, UpgradeType.NON_ROLLING, + Direction.UPGRADE, "2.2.0", new HashMap<String, Object>()); + context.setResolver(m_masterHostResolver); + + List<UpgradeGroupHolder> groups = m_upgradeHelper.createSequence(upgrade, context); + + assertEquals(2, groups.size()); + + + // zk server as a colocated grouping first. XML says to run a manual, 2 configs, and an execute + UpgradeGroupHolder group1 = groups.get(0); + assertEquals(7, group1.items.size()); + + // Stage 1. manual, 2 configs, execute + assertEquals(4, group1.items.get(0).getTasks().size()); + TaskWrapper taskWrapper = group1.items.get(0).getTasks().get(0); + assertEquals(1, taskWrapper.getTasks().size()); + assertEquals(Task.Type.MANUAL, taskWrapper.getTasks().get(0).getType()); + + taskWrapper = group1.items.get(0).getTasks().get(1); + assertEquals(1, taskWrapper.getTasks().size()); + assertEquals(Task.Type.CONFIGURE, taskWrapper.getTasks().get(0).getType()); + + taskWrapper = group1.items.get(0).getTasks().get(2); + assertEquals(1, taskWrapper.getTasks().size()); + assertEquals(Task.Type.CONFIGURE, taskWrapper.getTasks().get(0).getType()); + + taskWrapper = group1.items.get(0).getTasks().get(3); + assertEquals(1, taskWrapper.getTasks().size()); + assertEquals(Task.Type.EXECUTE, taskWrapper.getTasks().get(0).getType()); + + // Stage 2. restart for h1 + assertEquals(1, group1.items.get(1).getTasks().size()); + taskWrapper = group1.items.get(1).getTasks().get(0); + assertEquals(1, taskWrapper.getTasks().size()); + assertEquals(Task.Type.RESTART, taskWrapper.getTasks().get(0).getType()); + assertTrue(taskWrapper.getHosts().contains("h1")); + + // Stage 3. service check + assertEquals(1, group1.items.get(2).getTasks().size()); + taskWrapper = group1.items.get(2).getTasks().get(0); + assertEquals(1, taskWrapper.getTasks().size()); + assertEquals(Task.Type.SERVICE_CHECK, taskWrapper.getTasks().get(0).getType()); + + // stage 4. manual step for validation + assertEquals(1, group1.items.get(3).getTasks().size()); + taskWrapper = group1.items.get(3).getTasks().get(0); + assertEquals(1, taskWrapper.getTasks().size()); + assertEquals(Task.Type.MANUAL, taskWrapper.getTasks().get(0).getType()); + + // Stage 5. repeat execute as it's not a server-side task. no configure or manual tasks + assertEquals(1, group1.items.get(4).getTasks().size()); + taskWrapper = group1.items.get(4).getTasks().get(0); + assertEquals(1, taskWrapper.getTasks().size()); + assertEquals(Task.Type.EXECUTE, taskWrapper.getTasks().get(0).getType()); + + // Stage 6. restart for h2. + assertEquals(1, group1.items.get(5).getTasks().size()); + taskWrapper = group1.items.get(5).getTasks().get(0); + assertEquals(1, taskWrapper.getTasks().size()); + assertEquals(Task.Type.RESTART, taskWrapper.getTasks().get(0).getType()); + assertTrue(taskWrapper.getHosts().contains("h2")); + + // Stage 7. service check + assertEquals(1, group1.items.get(6).getTasks().size()); + taskWrapper = group1.items.get(6).getTasks().get(0); + assertEquals(1, taskWrapper.getTasks().size()); + assertEquals(Task.Type.SERVICE_CHECK, taskWrapper.getTasks().get(0).getType()); + + + // zk client + UpgradeGroupHolder group2 = groups.get(1); + assertEquals(5, group2.items.size()); + + // Stage 1. Configure + assertEquals(1, group2.items.get(0).getTasks().size()); + taskWrapper = group2.items.get(0).getTasks().get(0); + assertEquals(1, taskWrapper.getTasks().size()); + assertEquals(Task.Type.CONFIGURE, taskWrapper.getTasks().get(0).getType()); + + // Stage 2. Custom class + assertEquals(1, group2.items.get(1).getTasks().size()); + taskWrapper = group2.items.get(1).getTasks().get(0); + assertEquals(1, taskWrapper.getTasks().size()); + assertEquals(Task.Type.SERVER_ACTION, taskWrapper.getTasks().get(0).getType()); + + // Stage 3. Restart client on h1 + assertEquals(1, group2.items.get(2).getTasks().size()); + taskWrapper = group2.items.get(2).getTasks().get(0); + assertEquals(1, taskWrapper.getTasks().size()); + assertEquals(Task.Type.RESTART, taskWrapper.getTasks().get(0).getType()); + + // Stage 4. Restart client on h2 (no configure or custom class) + assertEquals(1, group2.items.get(3).getTasks().size()); + taskWrapper = group2.items.get(3).getTasks().get(0); + assertEquals(1, taskWrapper.getTasks().size()); + assertEquals(Task.Type.RESTART, taskWrapper.getTasks().get(0).getType()); + + // Stage 5. service check + assertEquals(1, group2.items.get(4).getTasks().size()); + taskWrapper = group2.items.get(4).getTasks().get(0); + assertEquals(1, taskWrapper.getTasks().size()); + assertEquals(Task.Type.SERVICE_CHECK, taskWrapper.getTasks().get(0).getType()); + + } + + + + /** * Tests {@link UpgradeType#HOST_ORDERED}, specifically that the orchestration * can properly expand the single {@link HostOrderGrouping} and create the http://git-wip-us.apache.org/repos/asf/ambari/blob/ad118045/ambari-server/src/test/resources/stacks/HDP/2.1.1/upgrades/upgrade_multi_server_tasks.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/resources/stacks/HDP/2.1.1/upgrades/upgrade_multi_server_tasks.xml b/ambari-server/src/test/resources/stacks/HDP/2.1.1/upgrades/upgrade_multi_server_tasks.xml new file mode 100644 index 0000000..de99d59 --- /dev/null +++ b/ambari-server/src/test/resources/stacks/HDP/2.1.1/upgrades/upgrade_multi_server_tasks.xml @@ -0,0 +1,88 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<upgrade xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="upgrade-pack.xsd"> + <target>2.2.*.*</target> + <target-stack>HDP-2.2.0</target-stack> + <type>ROLLING</type> + <prerequisite-checks /> + + <order> + <group xsi:type="colocated" name="ZOOKEEPER" title="Zookeeper"> + <skippable>true</skippable> + <allow-retry>false</allow-retry> + <service name="ZOOKEEPER"> + <component>ZOOKEEPER_SERVER</component> + </service> + + <batch> + <percent>20</percent> + <message>Please run additional tests on {{components}}</message> + </batch> + + </group> + + <group name="CLIENTS" title="Zookeeper Clients"> + <skippable>true</skippable> + <allow-retry>false</allow-retry> + <service name="ZOOKEEPER"> + <component>ZOOKEEPER_CLIENT</component> + </service> + </group> + + </order> + + <processing> + <service name="ZOOKEEPER"> + <component name="ZOOKEEPER_SERVER"> + <pre-upgrade> + <task xsi:type="manual"> + <message>This is a manual task with a placeholder of {{foo/bar}}</message> + </task> + + <task xsi:type="configure" id="hdp_2_1_1_zookeeper_new_config_type" /> + <task xsi:type="configure" id="hdp_2_1_1_zookeeper_new_config_type" /> + + + <task xsi:type="execute"> + <script>foo</script> + <function>list</function> + </task> + + </pre-upgrade> + <pre-downgrade copy-upgrade="true" /> + <upgrade> + <task xsi:type="restart-task" /> + </upgrade> + </component> + + <component name="ZOOKEEPER_CLIENT"> + <pre-upgrade> + <task xsi:type="configure" id="hdp_2_1_1_zookeeper_new_config_type" /> + <task xsi:type="server_action" class="org.apache.ambari.server.serveraction.upgrades.FixLzoCodecPath"/> + </pre-upgrade> + + <pre-downgrade /> + + <upgrade> + <task xsi:type="restart-task" /> + </upgrade> + </component> + + </service> + </processing> +</upgrade>
