Repository: ambari Updated Branches: refs/heads/trunk 8690cfdb4 -> ad2f5442b
AMBARI-11154 - Storm Upgrade Pack For HDP-2.2 to HDP-2.3 (jonathanhurley) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ad2f5442 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ad2f5442 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ad2f5442 Branch: refs/heads/trunk Commit: ad2f5442b3706235969c8982d3fa92fea808dc29 Parents: 8690cfd Author: Jonathan Hurley <[email protected]> Authored: Wed May 20 17:00:31 2015 -0400 Committer: Jonathan Hurley <[email protected]> Committed: Mon May 25 07:23:01 2015 -0400 ---------------------------------------------------------------------- .../ambari/server/actionmanager/Stage.java | 65 ++++----- .../ambari/server/agent/ExecutionCommand.java | 31 ++++- .../serveraction/upgrades/ConfigureAction.java | 38 +++++- .../state/stack/upgrade/ConfigureTask.java | 3 - .../server/state/stack/upgrade/ManualTask.java | 4 - .../ambari/server/state/stack/upgrade/Task.java | 7 + .../apache/ambari/server/utils/StageUtils.java | 53 ++++---- .../STORM/0.9.1.2.1/configuration/storm-env.xml | 3 +- .../STORM/0.9.1.2.1/package/scripts/nimbus.py | 5 + .../0.9.1.2.1/package/scripts/nimbus_prod.py | 1 + .../0.9.1.2.1/package/scripts/params_linux.py | 7 +- .../STORM/0.9.1.2.1/package/scripts/service.py | 56 ++++---- .../0.9.1.2.1/package/scripts/status_params.py | 2 + .../0.9.1.2.1/package/scripts/storm_upgrade.py | 133 +++++++++++++++++++ .../0.9.1.2.1/package/scripts/supervisor.py | 1 + .../package/scripts/supervisor_prod.py | 1 + .../services/STORM/configuration/storm-env.xml | 4 +- .../stacks/HDP/2.2/upgrades/upgrade-2.3.xml | 64 ++++++++- .../ranger-storm-policymgr-ssl.xml | 4 +- .../configuration/ranger-storm-security.xml | 2 +- .../ExecutionCommandWrapperTest.java | 96 ++++++------- .../server/agent/TestHeartbeatHandler.java | 23 +++- .../stacks/2.1/STORM/test_storm_nimbus.py | 2 + .../stacks/2.1/STORM/test_storm_nimbus_prod.py | 2 + .../stacks/2.1/STORM/test_storm_supervisor.py | 2 + .../2.1/STORM/test_storm_supervisor_prod.py | 2 + 26 files changed, 438 insertions(+), 173 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java index 03b3648..135bdc1 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java @@ -28,7 +28,6 @@ import java.util.TreeMap; import javax.annotation.Nullable; -import com.google.inject.Inject; import org.apache.ambari.server.Role; import org.apache.ambari.server.RoleCommand; import org.apache.ambari.server.agent.AgentCommand.AgentCommandType; @@ -47,6 +46,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; +import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.AssistedInject; import com.google.inject.persist.Transactional; @@ -100,7 +100,7 @@ public class Stage { @Assisted("commandParamsStage") String commandParamsStage, @Assisted("hostParamsStage") String hostParamsStage, HostRoleCommandFactory hostRoleCommandFactory) { - this.wrappersLoaded = true; + wrappersLoaded = true; this.requestId = requestId; this.logDir = logDir; this.clusterName = clusterName; @@ -109,7 +109,7 @@ public class Stage { this.clusterHostInfo = clusterHostInfo; this.commandParamsStage = commandParamsStage; this.hostParamsStage = hostParamsStage; - this.skippable = false; + skippable = false; this.hostRoleCommandFactory = hostRoleCommandFactory; } @@ -248,10 +248,10 @@ public class Stage { } //used on stage creation only, no need to check if wrappers loaded this.stageId = stageId; - for (String host: this.commandsToSend.keySet()) { - for (ExecutionCommandWrapper wrapper : this.commandsToSend.get(host)) { + for (String host: commandsToSend.keySet()) { + for (ExecutionCommandWrapper wrapper : commandsToSend.get(host)) { ExecutionCommand cmd = wrapper.getExecutionCommand(); - cmd.setCommandId(StageUtils.getActionId(requestId, stageId)); + cmd.setRequestAndStage(requestId, stageId); } } } @@ -275,34 +275,34 @@ public class Stage { hrc.setExecutionCommandWrapper(wrapper); cmd.setHostname(hostName); cmd.setClusterName(clusterName); - cmd.setCommandId(this.getActionId()); + cmd.setRequestAndStage(requestId, stageId); cmd.setRole(role.name()); cmd.setRoleCommand(command); cmd.setServiceName(""); - Map<String, HostRoleCommand> hrcMap = this.hostRoleCommands.get(hostName); + Map<String, HostRoleCommand> hrcMap = hostRoleCommands.get(hostName); if (hrcMap == null) { hrcMap = new LinkedHashMap<String, HostRoleCommand>(); - this.hostRoleCommands.put(hostName, hrcMap); + hostRoleCommands.put(hostName, hrcMap); } if (hrcMap.get(role.toString()) != null) { throw new RuntimeException( "Setting the host role command second time for same stage: stage=" - + this.getActionId() + ", host=" + hostName + ", role=" + role); + + getActionId() + ", host=" + hostName + ", role=" + role); } hrcMap.put(role.toString(), hrc); - List<ExecutionCommandWrapper> execCmdList = this.commandsToSend.get(hostName); + List<ExecutionCommandWrapper> execCmdList = commandsToSend.get(hostName); if (execCmdList == null) { execCmdList = new ArrayList<ExecutionCommandWrapper>(); - this.commandsToSend.put(hostName, execCmdList); + commandsToSend.put(hostName, execCmdList); } if (execCmdList.contains(wrapper)) { //todo: proper exception throw new RuntimeException( "Setting the execution command second time for same stage: stage=" - + this.getActionId() + ", host=" + hostName + ", role=" + role+ ", event="+event); + + getActionId() + ", host=" + hostName + ", role=" + role+ ", event="+event); } execCmdList.add(wrapper); return wrapper; @@ -458,7 +458,7 @@ public class Stage { */ public synchronized List<String> getHosts() { // TODO: Check whether method should be synchronized List<String> hlist = new ArrayList<String>(); - for (String h : this.hostRoleCommands.keySet()) { + for (String h : hostRoleCommands.keySet()) { hlist.add(h); } return hlist; @@ -504,19 +504,19 @@ public class Stage { } public long getLastAttemptTime(String host, String role) { - return this.hostRoleCommands.get(host).get(role).getLastAttemptTime(); + return hostRoleCommands.get(host).get(role).getLastAttemptTime(); } public short getAttemptCount(String host, String role) { - return this.hostRoleCommands.get(host).get(role).getAttemptCount(); + return hostRoleCommands.get(host).get(role).getAttemptCount(); } public void incrementAttemptCount(String hostname, String role) { - this.hostRoleCommands.get(hostname).get(role).incrementAttemptCount(); + hostRoleCommands.get(hostname).get(role).incrementAttemptCount(); } public void setLastAttemptTime(String host, String role, long t) { - this.hostRoleCommands.get(host).get(role).setLastAttemptTime(t); + hostRoleCommands.get(host).get(role).setLastAttemptTime(t); } public ExecutionCommandWrapper getExecutionCommandWrapper(String hostname, @@ -535,41 +535,41 @@ public class Stage { } public long getStartTime(String hostname, String role) { - return this.hostRoleCommands.get(hostname).get(role).getStartTime(); + return hostRoleCommands.get(hostname).get(role).getStartTime(); } public void setStartTime(String hostname, String role, long startTime) { - this.hostRoleCommands.get(hostname).get(role).setStartTime(startTime); + hostRoleCommands.get(hostname).get(role).setStartTime(startTime); } public HostRoleStatus getHostRoleStatus(String hostname, String role) { - return this.hostRoleCommands.get(hostname).get(role).getStatus(); + return hostRoleCommands.get(hostname).get(role).getStatus(); } public void setHostRoleStatus(String host, String role, HostRoleStatus status) { - this.hostRoleCommands.get(host).get(role).setStatus(status); + hostRoleCommands.get(host).get(role).setStatus(status); } public ServiceComponentHostEventWrapper getFsmEvent(String hostname, String roleStr) { - return this.hostRoleCommands.get(hostname).get(roleStr).getEvent(); + return hostRoleCommands.get(hostname).get(roleStr).getEvent(); } public void setExitCode(String hostname, String role, int exitCode) { - this.hostRoleCommands.get(hostname).get(role).setExitCode(exitCode); + hostRoleCommands.get(hostname).get(role).setExitCode(exitCode); } public int getExitCode(String hostname, String role) { - return this.hostRoleCommands.get(hostname).get(role).getExitCode(); + return hostRoleCommands.get(hostname).get(role).getExitCode(); } public void setStderr(String hostname, String role, String stdErr) { - this.hostRoleCommands.get(hostname).get(role).setStderr(stdErr); + hostRoleCommands.get(hostname).get(role).setStderr(stdErr); } public void setStdout(String hostname, String role, String stdOut) { - this.hostRoleCommands.get(hostname).get(role).setStdout(stdOut); + hostRoleCommands.get(hostname).get(role).setStdout(stdOut); } public synchronized boolean isStageInProgress() { @@ -597,9 +597,10 @@ public class Stage { if (hrc == null) { return false; } - for (HostRoleStatus status : statuses) - if (hrc.getStatus().equals(status)) { - return true; + for (HostRoleStatus status : statuses) { + if (hrc.getStatus().equals(status)) { + return true; + } } } } @@ -608,11 +609,11 @@ public class Stage { public Map<String, List<ExecutionCommandWrapper>> getExecutionCommands() { checkWrappersLoaded(); - return this.commandsToSend; + return commandsToSend; } public String getLogDir() { - return this.logDir; + return logDir; } public Map<String, Map<String, HostRoleCommand>> getHostRoleCommands() { http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java index 6c254e8..e4abb1d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java @@ -17,7 +17,12 @@ */ package org.apache.ambari.server.agent; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.ambari.server.RoleCommand; import org.apache.ambari.server.utils.StageUtils; @@ -42,6 +47,12 @@ public class ExecutionCommand extends AgentCommand { @SerializedName("clusterName") private String clusterName; + @SerializedName("requestId") + private long requestId; + + @SerializedName("stageId") + private long stageId; + @SerializedName("taskId") private long taskId; @@ -95,8 +106,20 @@ public class ExecutionCommand extends AgentCommand { return commandId; } - public void setCommandId(String commandId) { - this.commandId = commandId; + /** + * Sets the request and stage on this command. The {@code commandId} field is + * automatically constructed from these as requestId-stageId. + * + * @param requestId + * the ID of the execution request. + * @param stageId + * the ID of the stage request. + */ + public void setRequestAndStage(long requestId, long stageId) { + this.requestId = requestId; + this.stageId = stageId; + + commandId = StageUtils.getActionId(requestId, stageId); } @Override @@ -267,7 +290,7 @@ public class ExecutionCommand extends AgentCommand { * @param params for kerberos commands */ public void setKerberosCommandParams(List<Map<String, String>> params) { - this.kerberosCommandParams = params; + kerberosCommandParams = params; } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ConfigureAction.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ConfigureAction.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ConfigureAction.java index 69a03f5..268832e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ConfigureAction.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ConfigureAction.java @@ -56,8 +56,23 @@ import com.google.inject.Provider; /** * The {@link ConfigureAction} is used to alter a configuration property during - * an upgrade. It will only produce a new configuration if the value being - * changed is different than the existing value. + * an upgrade. It will only produce a new configuration if an actual change is + * occuring. For some configure tasks, the value is already at the desired + * property or the conditions of the task are not met. In these cases, a new + * configuration will not be created. This task can perform any of the following + * actions in a single declaration: + * <ul> + * <li>Copy a configuration to a new property key, optionally setting a default + * if the original property did not exist</li> + * <li>Copy a configuration to a new property key from one configuration type to + * another, optionally setting a default if the original property did not exist</li> + * <li>Rename a configuration, optionally setting a default if the original + * property did not exist</li> + * <li>Delete a configuration property</li> + * <li>Set a configuration property</li> + * <li>Conditionally set a configuration property based on another configuration + * property value</li> + * </ul> */ public class ConfigureAction extends AbstractServerAction { @@ -274,7 +289,7 @@ public class ConfigureAction extends AbstractServerAction { newValues.put(transfer.toKey, valueToCopy); // append standard output - outputBuffer.append(MessageFormat.format("Created {0}/{1} = {2}\n", configType, + outputBuffer.append(MessageFormat.format("Created {0}/{1} = \"{2}\"\n", configType, transfer.toKey, mask(transfer, valueToCopy))); } break; @@ -294,7 +309,8 @@ public class ConfigureAction extends AbstractServerAction { changedValues = true; // append standard output - outputBuffer.append(MessageFormat.format("Created {0}/{1} with default value {2}\n", + outputBuffer.append(MessageFormat.format( + "Created {0}/{1} with default value \"{2}\"\n", configType, transfer.toKey, mask(transfer, transfer.defaultValue))); } @@ -369,8 +385,16 @@ public class ConfigureAction extends AbstractServerAction { // byproduct of the configure being able to take a list of transfers // without a key/value to set newValues.put(key, value); - outputBuffer.append(MessageFormat.format("{0}/{1} changed to {2}\n", configType, key, - mask(keyValuePair, value))); + + final String message; + if (StringUtils.isEmpty(value)) { + message = MessageFormat.format("{0}/{1} changed to an empty value", configType, key); + } else { + message = MessageFormat.format("{0}/{1} changed to \"{2}\"\n", configType, key, + mask(keyValuePair, value)); + } + + outputBuffer.append(message); } } } @@ -381,7 +405,7 @@ public class ConfigureAction extends AbstractServerAction { String toReplace = newValues.get(replacement.key); if (!toReplace.contains(replacement.find)) { - outputBuffer.append(MessageFormat.format("String {0} was not found in {1}/{2}\n", + outputBuffer.append(MessageFormat.format("String \"{0}\" was not found in {1}/{2}\n", replacement.find, configType, replacement.key)); } else { String replaced = StringUtils.replace(toReplace, replacement.find, replacement.replaceWith); http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigureTask.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigureTask.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigureTask.java index f5a77c5..95bfb48 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigureTask.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ConfigureTask.java @@ -126,9 +126,6 @@ public class ConfigureTask extends ServerSideActionTask { @XmlElement(name = "set") private List<ConfigurationKeyValue> keyValuePairs; - @XmlElement(name="summary") - public String summary; - @XmlElement(name = "condition") private List<Condition> conditions; http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ManualTask.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ManualTask.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ManualTask.java index fe933cc..2b1ba56 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ManualTask.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ManualTask.java @@ -47,10 +47,6 @@ public class ManualTask extends ServerSideActionTask { @XmlElement(name="message") public String message; - @XmlElement(name="summary") - public String summary; - - @Override public Task.Type getType() { return type; http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java index fbd837c..6416b57 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java @@ -17,6 +17,7 @@ */ package org.apache.ambari.server.state.stack.upgrade; +import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlSeeAlso; @@ -27,6 +28,12 @@ import javax.xml.bind.annotation.XmlSeeAlso; public abstract class Task { /** + * An optional brief description of what this task is doing. + */ + @XmlElement(name = "summary") + public String summary; + + /** * @return the type of the task */ public abstract Type getType(); http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java index 66612bd..aeca69b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java @@ -17,10 +17,27 @@ */ package org.apache.ambari.server.utils; -import org.apache.commons.lang.StringUtils; -import com.google.common.base.Joiner; -import com.google.gson.Gson; -import com.google.inject.Inject; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; + +import javax.xml.bind.JAXBException; + import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.Role; import org.apache.ambari.server.RoleCommand; @@ -36,6 +53,7 @@ import org.apache.ambari.server.state.ServiceComponent; import org.apache.ambari.server.state.ServiceComponentHost; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent; import org.apache.ambari.server.topology.TopologyManager; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.codehaus.jackson.JsonGenerationException; @@ -43,25 +61,9 @@ import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; -import javax.xml.bind.JAXBException; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; +import com.google.common.base.Joiner; +import com.google.gson.Gson; +import com.google.inject.Inject; public class StageUtils { @@ -201,7 +203,8 @@ public class StageUtils { new ServiceComponentHostInstallEvent("NAMENODE", hostname, now, "HDP-1.2.0"), "cluster1", "HDFS", false); ExecutionCommand execCmd = s.getExecutionCommandWrapper(hostname, "NAMENODE").getExecutionCommand(); - execCmd.setCommandId(s.getActionId()); + + execCmd.setRequestAndStage(s.getRequestId(), s.getStageId()); List<String> slaveHostList = new ArrayList<String>(); slaveHostList.add(hostname); slaveHostList.add("host2"); @@ -245,7 +248,7 @@ public class StageUtils { InputStream is = new ByteArrayInputStream(json.getBytes(Charset.forName("UTF8"))); return mapper.readValue(is, clazz); } - + public static Map<String, String> getCommandParamsStage(ActionExecutionContext actionExecContext) throws AmbariException { return actionExecContext.getParameters() != null ? actionExecContext.getParameters() : new TreeMap<String, String>(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-env.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-env.xml b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-env.xml index 2c7bbc4..5129d87 100644 --- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-env.xml +++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/configuration/storm-env.xml @@ -58,7 +58,8 @@ export JAVA_HOME={{java64_home}} # Storm log folder export STORM_LOG_DIR={{log_dir}} -# export STORM_CONF_DIR="" +export STORM_CONF_DIR={{conf_dir}} +export STORM_HOME={{storm_component_home_dir}} </value> </property> http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus.py index 66b46c8..93f3e05 100644 --- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus.py +++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus.py @@ -57,8 +57,10 @@ class NimbusDefault(Nimbus): env.set_params(params) if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0: conf_select.select(params.stack_name, "storm", params.version) + hdp_select.select("storm-client", params.version) hdp_select.select("storm-nimbus", params.version) + def start(self, env, rolling_restart=False): import params env.set_params(params) @@ -66,16 +68,19 @@ class NimbusDefault(Nimbus): setup_ranger_storm() service("nimbus", action="start") + def stop(self, env, rolling_restart=False): import params env.set_params(params) service("nimbus", action="stop") + def status(self, env): import status_params env.set_params(status_params) check_process_status(status_params.pid_nimbus) + def security_status(self, env): import status_params env.set_params(status_params) http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus_prod.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus_prod.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus_prod.py index 313bb17..f9d64f4 100644 --- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus_prod.py +++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/nimbus_prod.py @@ -49,6 +49,7 @@ class Nimbus(Script): if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0: conf_select.select(params.stack_name, "storm", params.version) + hdp_select.select("storm-client", params.version) hdp_select.select("storm-nimbus", params.version) def start(self, env, rolling_restart=False): http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/params_linux.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/params_linux.py index 90e3b7b..41ea1ac 100644 --- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/params_linux.py +++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/params_linux.py @@ -55,6 +55,7 @@ sudo = AMBARI_SUDO_BINARY stack_name = default("/hostLevelParams/stack_name", None) version = default("/commandParams/version", None) +storm_component_home_dir = status_params.storm_component_home_dir conf_dir = status_params.conf_dir stack_version_unformatted = str(config['hostLevelParams']['stack_version']) @@ -81,6 +82,8 @@ user_group = config['configurations']['cluster-env']['user_group'] java64_home = config['hostLevelParams']['java_home'] jps_binary = format("{java64_home}/bin/jps") nimbus_port = config['configurations']['storm-site']['nimbus.thrift.port'] +storm_zookeeper_root_dir = default('/configurations/storm-site/storm.zookeeper.root', None) +storm_zookeeper_servers = default('/configurations/storm-site/storm.zookeeper.servers', None) # nimbus.seeds is supported in HDP 2.3.0.0 and higher nimbus_seeds_supported = default('/configurations/storm-env/nimbus_seeds_supported', False) @@ -93,8 +96,7 @@ rest_api_conf_file = format("{conf_dir}/config.yaml") storm_env_sh_template = config['configurations']['storm-env']['content'] jmxremote_port = config['configurations']['storm-env']['jmxremote_port'] -if 'ganglia_server_host' in config['clusterHostInfo'] and \ - len(config['clusterHostInfo']['ganglia_server_host'])>0: +if 'ganglia_server_host' in config['clusterHostInfo'] and len(config['clusterHostInfo']['ganglia_server_host'])>0: ganglia_installed = True ganglia_server = config['clusterHostInfo']['ganglia_server_host'][0] ganglia_report_interval = 60 @@ -135,6 +137,7 @@ if has_metric_collector: metric_collector_host = ams_collector_hosts[0] metric_collector_report_interval = 60 metric_collector_app_id = "nimbus" + metric_collector_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink*.jar" # ranger host http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/service.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/service.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/service.py index 901aecc..0080beb 100644 --- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/service.py +++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/service.py @@ -18,21 +18,19 @@ limitations under the License. """ - from resource_management.core.resources import Execute from resource_management.core.resources import File from resource_management.libraries.functions.format import format import time -def service( - name, - action='start'): +def service(name, action = 'start'): import params import status_params pid_file = status_params.pid_files[name] - no_op_test = format("ls {pid_file} >/dev/null 2>&1 && ps -p `cat {pid_file}` >/dev/null 2>&1") + no_op_test = format( + "ls {pid_file} >/dev/null 2>&1 && ps -p `cat {pid_file}` >/dev/null 2>&1") if name == "logviewer" or name == "drpc": tries_count = 12 @@ -45,43 +43,45 @@ def service( process_grep = format("{rest_lib_dir}/storm-rest-.*\.jar$") else: process_grep = format("storm.daemon.{name}$") - + find_proc = format("{jps_binary} -l | grep {process_grep}") write_pid = format("{find_proc} | awk {{'print $1'}} > {pid_file}") crt_pid_cmd = format("{find_proc} && {write_pid}") - storm_env = format("source {conf_dir}/storm-env.sh ; export PATH=$JAVA_HOME/bin:$PATH") + storm_env = format( + "source {conf_dir}/storm-env.sh ; export PATH=$JAVA_HOME/bin:$PATH") if action == "start": if name == "rest_api": - process_cmd = format("{storm_env} ; java -jar {rest_lib_dir}/`ls {rest_lib_dir} | grep -wE storm-rest-[0-9.-]+\.jar` server") - cmd = format("{process_cmd} {rest_api_conf_file} > {log_dir}/restapi.log 2>&1") + process_cmd = format( + "{storm_env} ; java -jar {rest_lib_dir}/`ls {rest_lib_dir} | grep -wE storm-rest-[0-9.-]+\.jar` server") + cmd = format( + "{process_cmd} {rest_api_conf_file} > {log_dir}/restapi.log 2>&1") else: cmd = format("{storm_env} ; storm {name} > {log_dir}/{name}.out 2>&1") Execute(cmd, - not_if=no_op_test, - user=params.storm_user, - wait_for_finish=False, - path=params.storm_bin_dir - ) + not_if = no_op_test, + user = params.storm_user, + wait_for_finish = False, + path = params.storm_bin_dir) + Execute(crt_pid_cmd, - user=params.storm_user, - logoutput=True, - tries=tries_count, - try_sleep=10, - path=params.storm_bin_dir - ) + user = params.storm_user, + logoutput = True, + tries = tries_count, + try_sleep = 10, + path = params.storm_bin_dir) elif action == "stop": process_dont_exist = format("! ({no_op_test})") pid = format("`cat {pid_file}`") + Execute(format("{sudo} kill {pid}"), - not_if=process_dont_exist - ) + not_if = process_dont_exist) + Execute(format("{sudo} kill -9 {pid}"), - not_if=format("sleep 2; {process_dont_exist} || sleep 20; {process_dont_exist}"), - ignore_failures=True - ) - File(pid_file, - action = "delete", - ) + not_if = format( + "sleep 2; {process_dont_exist} || sleep 20; {process_dont_exist}"), + ignore_failures = True) + + File(pid_file, action = "delete") http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/status_params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/status_params.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/status_params.py index 99397ac..49dee47 100644 --- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/status_params.py +++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/status_params.py @@ -64,8 +64,10 @@ else: kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None)) tmp_dir = Script.get_tmp_dir() + storm_component_home_dir = "/usr/lib/storm" conf_dir = "/etc/storm/conf" if Script.is_hdp_stack_greater_or_equal("2.2"): + storm_component_home_dir = format("/usr/hdp/current/{component_directory}") conf_dir = format("/usr/hdp/current/{component_directory}/conf") storm_user = config['configurations']['storm-env']['storm_user'] http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/storm_upgrade.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/storm_upgrade.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/storm_upgrade.py new file mode 100644 index 0000000..b25cdf8 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/storm_upgrade.py @@ -0,0 +1,133 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import json +import os + +from resource_management.core.logger import Logger +from resource_management.core.exceptions import Fail +from resource_management.core.resources.system import Directory +from resource_management.core.resources.system import File +from resource_management.core.resources.system import Execute +from resource_management.libraries.script.script import Script +from resource_management.libraries.functions import format +from resource_management.libraries.functions.default import default + +class StormUpgrade(Script): + """ + This class helps perform some of the upgrade tasks needed for Storm during + a non-rolling upgrade. Storm writes data to disk locally and to ZooKeeper. + If any HDP 2.2 bits exist in these directories when an HDP 2.3 instance + starts up, it will fail to start properly. Because the upgrade framework in + Ambari doesn't yet have a mechanism to say "stop all" before starting to + upgrade each component, we need to rely on a Storm trick to bring down + running daemons. By removing the ZooKeeper data with running daemons, those + daemons will die. + """ + + def delete_storm_zookeeper_data(self, env): + """ + Deletes the Storm data from ZooKeeper, effectively bringing down all + Storm daemons. + :return: + """ + import params + + Logger.info('Clearing Storm data from ZooKeeper') + + storm_zookeeper_root_dir = params.storm_zookeeper_root_dir + if storm_zookeeper_root_dir is None: + raise Fail("The storm ZooKeeper directory specified by storm-site/storm.zookeeper.root must be specified") + + # create the ZooKeeper delete command + if params.version is not None: + command = "/usr/hdp/{0}/zookeeper/bin/zkCli.sh rmr /storm".format(params.version) + else: + command = "/usr/hdp/current/zookeeper-client/bin/zkCli.sh rmr /storm" + + if params.security_enabled: + kinit_command=format("{kinit_path_local} -kt {smoke_user_keytab} {smokeuser_principal}; ") + Execute(kinit_command,user=params.smokeuser) + + # clean out ZK + Execute(command, user=params.storm_user, tries=1) + + + def delete_storm_local_data(self, env): + """ + Deletes Storm data from local directories. This will create a marker file + with JSON data representing the upgrade stack and request/stage ID. This + will prevent multiple Storm components on the same host from removing + the local directories more than once. + :return: + """ + import params + + Logger.info('Clearing Storm data from local directories...') + + storm_local_directory = params.local_dir + if storm_local_directory is None: + raise Fail("The storm local directory specified by storm-site/storm.local.dir must be specified") + + request_id = default("/requestId", None) + stage_id = default("/stageId", None) + stack_version = params.version + stack_name = params.stack_name + + json_map = {} + json_map["requestId"] = request_id + json_map["stageId"] = stage_id + json_map["stackVersion"] = stack_version + json_map["stackName"] = stack_name + + temp_directory = params.tmp_dir + upgrade_file = os.path.join(temp_directory, "storm-upgrade-{0}.json".format(stack_version)) + + if os.path.exists(upgrade_file): + try: + with open(upgrade_file) as file_pointer: + existing_json_map = json.load(file_pointer) + + if cmp(json_map, existing_json_map) == 0: + Logger.info("The storm upgrade has already removed the local directories for {0}-{1} for request {2} and stage {3}".format( + stack_name, stack_version, request_id, stage_id)) + + # nothing else to do here for this as it appears to have already been + # removed by another component being upgraded + return + + except: + Logger.error("The upgrade file {0} appears to be corrupt; removing...".format(upgrade_file)) + File(upgrade_file, action="delete") + else: + # delete the upgrade file since it does not match + File(upgrade_file, action="delete") + + # delete from local directory + Directory(storm_local_directory, action="delete", recursive=True) + + # recreate storm local directory + Directory(storm_local_directory, mode=0755, owner = params.storm_user, + group = params.user_group, recursive = True) + + # the file doesn't exist, so create it + with open(upgrade_file, 'w') as file_pointer: + json.dump(json_map, file_pointer, indent=2) + +if __name__ == "__main__": + StormUpgrade().execute() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor.py index bdb03df..335aeeb 100644 --- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor.py +++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor.py @@ -75,6 +75,7 @@ class SupervisorDefault(Supervisor): if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0: conf_select.select(params.stack_name, "storm", params.version) + hdp_select.select("storm-client", params.version) hdp_select.select("storm-supervisor", params.version) def start(self, env, rolling_restart=False): http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor_prod.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor_prod.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor_prod.py index bde533f..f3074f1 100644 --- a/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor_prod.py +++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/supervisor_prod.py @@ -50,6 +50,7 @@ class Supervisor(Script): if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0: conf_select.select(params.stack_name, "storm", params.version) + hdp_select.select("storm-client", params.version) hdp_select.select("storm-supervisor", params.version) def start(self, env, rolling_restart=False): http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/stacks/HDP/2.2/services/STORM/configuration/storm-env.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/services/STORM/configuration/storm-env.xml b/ambari-server/src/main/resources/stacks/HDP/2.2/services/STORM/configuration/storm-env.xml index 1aef735..aa52d3b 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.2/services/STORM/configuration/storm-env.xml +++ b/ambari-server/src/main/resources/stacks/HDP/2.2/services/STORM/configuration/storm-env.xml @@ -33,8 +33,8 @@ # The java implementation to use. export JAVA_HOME={{java64_home}} -# export STORM_CONF_DIR="" -export STORM_HOME=/usr/hdp/current/storm-client +export STORM_CONF_DIR={{conf_dir}} +export STORM_HOME={{storm_component_home_dir}} </value> </property> </configuration> http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.3.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.3.xml b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.3.xml index 05aa89f..1cbdd88 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.3.xml +++ b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.3.xml @@ -178,6 +178,7 @@ <service name="ZOOKEEPER"> <component>ZOOKEEPER_CLIENT</component> </service> + <service name="HDFS"> <component>HDFS_CLIENT</component> </service> @@ -285,6 +286,7 @@ <function>finalize_rolling_upgrade</function> </task> </execute-stage> + <execute-stage title="Save Cluster State" service="" component=""> <task xsi:type="server_action" class="org.apache.ambari.server.serveraction.upgrades.FinalizeUpgradeAction"> </task> @@ -331,6 +333,7 @@ <value>org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer</value> </condition> </task> + <task xsi:type="configure" summary="Transitioning Ranger HDFS Policy"> <type>ranger-hdfs-policymgr-ssl</type> <transfer operation="copy" from-type="ranger-hdfs-plugin-properties" from-key="SSL_KEYSTORE_FILE_PATH" to-key="xasecure.policymgr.clientssl.keystore" default-value="/usr/hdp/current/hadoop-client/conf/ranger-plugin-keystore.jks" /> @@ -338,6 +341,7 @@ <transfer operation="copy" from-type="ranger-hdfs-plugin-properties" from-key="SSL_TRUSTSTORE_FILE_PATH" to-key="xasecure.policymgr.clientssl.truststore" default-value="/usr/hdp/current/hadoop-client/conf/ranger-plugin-truststore.jks" /> <transfer operation="copy" from-type="ranger-hdfs-plugin-properties" from-key="SSL_TRUSTSTORE_PASSWORD" to-key="xasecure.policymgr.clientssl.truststore.password" mask="true" default-value="changeit" /> </task> + <task xsi:type="configure" summary="Transitioning Ranger HDFS Audit"> <type>ranger-hdfs-audit</type> <transfer operation="copy" from-type="ranger-hdfs-plugin-properties" from-key="XAAUDIT.DB.IS_ENABLED" to-key="xasecure.audit.destination.db" default-value="false"/> @@ -387,6 +391,7 @@ <transfer operation="delete" delete-key="POLICY_MGR_URL" /> </task> </pre-upgrade> + <upgrade> <task xsi:type="restart" /> </upgrade> @@ -556,6 +561,7 @@ <transfer operation="delete" delete-key="XAAUDIT.DB.PASSWORD" /> </task> </pre-upgrade> + <upgrade> <task xsi:type="restart" /> </upgrade> @@ -580,9 +586,6 @@ <task xsi:type="configure"> <type>tez-site</type> <set key="tez.am.view-acls" value="*"/> - </task> - <task xsi:type="configure"> - <type>tez-site</type> <set key="tez.task.generate.counters.per.io" value="true"/> </task> </pre-upgrade> @@ -899,17 +902,33 @@ <message>Before continuing, please deactivate and kill any currently running topologies.</message> </task> + <task xsi:type="execute" summary="Removing Storm data from ZooKeeper"> + <script>scripts/storm_upgrade.py</script> + <function>delete_storm_zookeeper_data</function> + </task> + + <task xsi:type="execute" summary="Removing local Storm data"> + <script>scripts/storm_upgrade.py</script> + <function>delete_storm_local_data</function> + </task> + <task xsi:type="configure" summary="Converting nimbus.host into nimbus.seeds"> <type>storm-site</type> - <transfer operation="copy" from-key="nimbus.host" to-key="nimbus.seeds" coerce-to="yaml-array" /> - <transfer operation="delete" delete-key="nimbus.host" /> + <transfer operation="copy" from-key="nimbus.host" to-key="nimbus.seeds" coerce-to="yaml-array"/> + <transfer operation="delete" delete-key="nimbus.host"/> + </task> + + <task xsi:type="configure" summary="Updating Storm home and configuration environment variables"> + <type>storm-env</type> + <replace key="content" find="# export STORM_CONF_DIR=""" replace-with="export STORM_CONF_DIR={{conf_dir}}"/> + <replace key="content" find="export STORM_HOME=/usr/hdp/current/storm-client" replace-with="export STORM_HOME={{storm_component_home_dir}}"/> </task> <task xsi:type="configure" summary="Configuring Ranger Storm Policy"> <type>ranger-storm-policymgr-ssl</type> - <transfer operation="copy" from-type="ranger-storm-plugin-properties" from-key="SSL_KEYSTORE_FILE_PATH" to-key="xasecure.policymgr.clientssl.keystore" default-value="/usr/hdp/current/storm-nimbus/conf/ranger-plugin-keystore.jks"/> + <transfer operation="copy" from-type="ranger-storm-plugin-properties" from-key="SSL_KEYSTORE_FILE_PATH" to-key="xasecure.policymgr.clientssl.keystore" default-value="/usr/hdp/current/storm-client/conf/ranger-plugin-keystore.jks"/> <transfer operation="copy" from-type="ranger-storm-plugin-properties" from-key="SSL_KEYSTORE_PASSWORD" to-key="xasecure.policymgr.clientssl.keystore.password" default-value="myKeyFilePassword" mask="true"/> - <transfer operation="copy" from-type="ranger-storm-plugin-properties" from-key="SSL_TRUSTSTORE_FILE_PATH" to-key="xasecure.policymgr.clientssl.truststore" default-value="/usr/hdp/current/storm-nimbus/conf/ranger-plugin-truststore.jks"/> + <transfer operation="copy" from-type="ranger-storm-plugin-properties" from-key="SSL_TRUSTSTORE_FILE_PATH" to-key="xasecure.policymgr.clientssl.truststore" default-value="/usr/hdp/current/storm-client/conf/ranger-plugin-truststore.jks"/> <transfer operation="copy" from-type="ranger-storm-plugin-properties" from-key="SSL_TRUSTSTORE_PASSWORD" to-key="xasecure.policymgr.clientssl.truststore.password" default-value="changeit" mask="true"/> </task> @@ -959,25 +978,56 @@ <task xsi:type="restart" /> </upgrade> </component> + <component name="STORM_REST_API"> + <pre-upgrade> + <task xsi:type="execute" summary="Removing local Storm data"> + <script>scripts/storm_upgrade.py</script> + <function>delete_storm_local_data</function> + </task> + </pre-upgrade> <upgrade> <task xsi:type="restart" /> </upgrade> </component> + <component name="SUPERVISOR"> + <pre-upgrade> + <task xsi:type="execute" summary="Removing local Storm data"> + <script>scripts/storm_upgrade.py</script> + <function>delete_storm_local_data</function> + </task> + </pre-upgrade> <upgrade> <task xsi:type="restart" /> </upgrade> </component> + <component name="STORM_UI_SERVER"> + <pre-upgrade> + <task xsi:type="execute" summary="Removing local Storm data"> + <script>scripts/storm_upgrade.py</script> + <function>delete_storm_local_data</function> + </task> + </pre-upgrade> + <upgrade> <task xsi:type="restart" /> </upgrade> </component> + <component name="DRPC_SERVER"> + <pre-upgrade> + <task xsi:type="execute" summary="Removing local Storm data"> + <script>scripts/storm_upgrade.py</script> + <function>delete_storm_local_data</function> + </task> + </pre-upgrade> + <upgrade> <task xsi:type="restart" /> </upgrade> + <post-upgrade> <task xsi:type="manual"> <message>Please rebuild your topology using the new Storm version dependencies and resubmit it using the newly created jar.</message> http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-policymgr-ssl.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-policymgr-ssl.xml b/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-policymgr-ssl.xml index 4600a14..855e6fd 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-policymgr-ssl.xml +++ b/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-policymgr-ssl.xml @@ -22,7 +22,7 @@ <property> <name>xasecure.policymgr.clientssl.keystore</name> - <value>/usr/hdp/current/storm-nimbus/conf/ranger-plugin-keystore.jks</value> + <value>/usr/hdp/current/storm-client/conf/ranger-plugin-keystore.jks</value> <description>Java Keystore files</description> </property> @@ -34,7 +34,7 @@ <property> <name>xasecure.policymgr.clientssl.truststore</name> - <value>/usr/hdp/current/storm-nimbus/conf/ranger-plugin-truststore.jks</value> + <value>/usr/hdp/current/storm-client/conf/ranger-plugin-truststore.jks</value> <description>java truststore file</description> </property> http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-security.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-security.xml b/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-security.xml index 33567c8..f26be4d 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-security.xml +++ b/ambari-server/src/main/resources/stacks/HDP/2.3/services/STORM/configuration/ranger-storm-security.xml @@ -40,7 +40,7 @@ <property> <name>ranger.plugin.storm.policy.rest.ssl.config.file</name> - <value>/usr/hdp/current/storm-nimbus/conf/ranger-policymgr-ssl.xml</value> + <value>/usr/hdp/current/storm-client/conf/ranger-policymgr-ssl.xml</value> <description>Path to the file containing SSL details to contact Ranger Admin</description> </property> http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java index 66efea1..8d21b80 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapperTest.java @@ -52,7 +52,7 @@ import com.google.inject.Guice; import com.google.inject.Injector; public class ExecutionCommandWrapperTest { - + private static final String HOST1 = "dev01.ambari.apache.org"; private static final String CLUSTER1 = "c1"; private static final String CLUSTER_VERSION_TAG = "clusterVersion"; @@ -87,7 +87,7 @@ public class ExecutionCommandWrapperTest { private static Map<String, String> SERVICE_SITE_SERVICE; private static Map<String, String> SERVICE_SITE_HOST; private static Map<String, Map<String, String>> CONFIG_ATTRIBUTES; - + private static Injector injector; private static Clusters clusters; private static ConfigFactory configFactory; @@ -101,35 +101,35 @@ public class ExecutionCommandWrapperTest { configHelper = injector.getInstance(ConfigHelper.class); configFactory = injector.getInstance(ConfigFactory.class); stageFactory = injector.getInstance(StageFactory.class); - + clusters = injector.getInstance(Clusters.class); clusters.addHost(HOST1); clusters.getHost(HOST1).persist(); clusters.addCluster(CLUSTER1, new StackId("HDP-0.1")); - + Cluster cluster1 = clusters.getCluster(CLUSTER1); - + SERVICE_SITE_CLUSTER = new HashMap<String, String>(); SERVICE_SITE_CLUSTER.put(SERVICE_SITE_NAME1, SERVICE_SITE_VAL1); SERVICE_SITE_CLUSTER.put(SERVICE_SITE_NAME2, SERVICE_SITE_VAL2); SERVICE_SITE_CLUSTER.put(SERVICE_SITE_NAME3, SERVICE_SITE_VAL3); SERVICE_SITE_CLUSTER.put(SERVICE_SITE_NAME4, SERVICE_SITE_VAL4); - + SERVICE_SITE_SERVICE = new HashMap<String, String>(); SERVICE_SITE_SERVICE.put(SERVICE_SITE_NAME1, SERVICE_SITE_VAL1_S); SERVICE_SITE_SERVICE.put(SERVICE_SITE_NAME2, SERVICE_SITE_VAL2_S); SERVICE_SITE_SERVICE.put(SERVICE_SITE_NAME5, SERVICE_SITE_VAL5_S); - + SERVICE_SITE_HOST = new HashMap<String, String>(); SERVICE_SITE_HOST.put(SERVICE_SITE_NAME2, SERVICE_SITE_VAL2_H); SERVICE_SITE_HOST.put(SERVICE_SITE_NAME6, SERVICE_SITE_VAL6_H); - + GLOBAL_CLUSTER = new HashMap<String, String>(); GLOBAL_CLUSTER.put(GLOBAL_NAME1, GLOBAL_CLUSTER_VAL1); GLOBAL_CLUSTER.put(GLOBAL_NAME2, GLOBAL_CLUSTER_VAL2); - + CONFIG_ATTRIBUTES = new HashMap<String, Map<String,String>>(); - + //Cluster level global config Config globalConfig = configFactory.createNew(cluster1, GLOBAL_CONFIG, GLOBAL_CLUSTER, CONFIG_ATTRIBUTES); globalConfig.setTag(CLUSTER_VERSION_TAG); @@ -139,25 +139,25 @@ public class ExecutionCommandWrapperTest { Config serviceSiteConfigCluster = configFactory.createNew(cluster1, SERVICE_SITE_CONFIG, SERVICE_SITE_CLUSTER, CONFIG_ATTRIBUTES); serviceSiteConfigCluster.setTag(CLUSTER_VERSION_TAG); cluster1.addConfig(serviceSiteConfigCluster); - + //Service level service config Config serviceSiteConfigService = configFactory.createNew(cluster1, SERVICE_SITE_CONFIG, SERVICE_SITE_SERVICE, CONFIG_ATTRIBUTES); serviceSiteConfigService.setTag(SERVICE_VERSION_TAG); cluster1.addConfig(serviceSiteConfigService); - + //Host level service config Config serviceSiteConfigHost = configFactory.createNew(cluster1, SERVICE_SITE_CONFIG, SERVICE_SITE_HOST, CONFIG_ATTRIBUTES); serviceSiteConfigHost.setTag(HOST_VERSION_TAG); cluster1.addConfig(serviceSiteConfigHost); - + ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class); - + createTask(db, 1, 1, HOST1, CLUSTER1); - + } - + private static void createTask(ActionDBAccessor db, long requestId, long stageId, String hostName, String clusterName) throws AmbariException { - + Stage s = stageFactory.createNew(requestId, "/var/log", clusterName, 1L, "execution command wrapper test", "clusterHostInfo", "commandParamsStage", "hostParamsStage"); s.setStageId(stageId); s.addHostRoleExecutionCommand(hostName, Role.NAMENODE, @@ -169,36 +169,36 @@ public class ExecutionCommandWrapperTest { Request request = new Request(stages, clusters); db.persistActions(request); } - + @Test public void testGetExecutionCommand() throws JSONException, AmbariException { - - + + Map<String, Map<String, String>> confs = new HashMap<String, Map<String, String>>(); Map<String, String> configurationsGlobal = new HashMap<String, String>(); configurationsGlobal.put(GLOBAL_NAME1, GLOBAL_VAL1); confs.put(GLOBAL_CONFIG, configurationsGlobal); - + Map<String, Map<String, String>> confTags = new HashMap<String, Map<String, String>>(); Map<String, String> confTagServiceSite = new HashMap<String, String>(); - + confTagServiceSite.put("tag", CLUSTER_VERSION_TAG); confTagServiceSite.put("service_override_tag", SERVICE_VERSION_TAG); confTagServiceSite.put("host_override_tag", HOST_VERSION_TAG); - + confTags.put(SERVICE_SITE_CONFIG, confTagServiceSite); - + Map<String, String> confTagGlobal = Collections.singletonMap("tag", CLUSTER_VERSION_TAG); - + confTags.put(GLOBAL_CONFIG, confTagGlobal); - - + + ExecutionCommand executionCommand = new ExecutionCommand(); - - + + executionCommand.setClusterName(CLUSTER1); executionCommand.setTaskId(1); - executionCommand.setCommandId("1-1"); + executionCommand.setRequestAndStage(1, 1); executionCommand.setHostname(HOST1); executionCommand.setRole("NAMENODE"); executionCommand.setRoleParams(Collections.<String, String>emptyMap()); @@ -208,63 +208,63 @@ public class ExecutionCommandWrapperTest { executionCommand.setServiceName("HDFS"); executionCommand.setCommandType(AgentCommandType.EXECUTION_COMMAND); executionCommand.setCommandParams(Collections.<String, String>emptyMap()); - + String json = StageUtils.getGson().toJson(executionCommand, ExecutionCommand.class); ExecutionCommandWrapper execCommWrap = new ExecutionCommandWrapper(json); ExecutionCommand processedExecutionCommand = execCommWrap.getExecutionCommand(); - + Map<String, String> serviceSiteConfig = processedExecutionCommand.getConfigurations().get(SERVICE_SITE_CONFIG); - + Assert.assertEquals(SERVICE_SITE_VAL1_S, serviceSiteConfig.get(SERVICE_SITE_NAME1)); Assert.assertEquals(SERVICE_SITE_VAL2_H, serviceSiteConfig.get(SERVICE_SITE_NAME2)); Assert.assertEquals(SERVICE_SITE_VAL3, serviceSiteConfig.get(SERVICE_SITE_NAME3)); Assert.assertEquals(SERVICE_SITE_VAL4, serviceSiteConfig.get(SERVICE_SITE_NAME4)); Assert.assertEquals(SERVICE_SITE_VAL5_S, serviceSiteConfig.get(SERVICE_SITE_NAME5)); Assert.assertEquals(SERVICE_SITE_VAL6_H, serviceSiteConfig.get(SERVICE_SITE_NAME6)); - + Map<String, String> globalConfig = processedExecutionCommand.getConfigurations().get(GLOBAL_CONFIG); - + Assert.assertEquals(GLOBAL_VAL1, globalConfig.get(GLOBAL_NAME1)); Assert.assertEquals(GLOBAL_CLUSTER_VAL2, globalConfig.get(GLOBAL_NAME2)); - + //Union of all keys of service site configs Set<String> serviceSiteKeys = new HashSet<String>(); serviceSiteKeys.addAll(SERVICE_SITE_CLUSTER.keySet()); serviceSiteKeys.addAll(SERVICE_SITE_SERVICE.keySet()); serviceSiteKeys.addAll(SERVICE_SITE_HOST.keySet()); - + Assert.assertEquals(serviceSiteKeys.size(), serviceSiteConfig.size()); - + } - + @Test public void testGetMergedConfig() { Map<String, String> baseConfig = new HashMap<String, String>(); - + baseConfig.put(SERVICE_SITE_NAME1, SERVICE_SITE_VAL1); baseConfig.put(SERVICE_SITE_NAME2, SERVICE_SITE_VAL2); baseConfig.put(SERVICE_SITE_NAME3, SERVICE_SITE_VAL3); baseConfig.put(SERVICE_SITE_NAME4, SERVICE_SITE_VAL4); baseConfig.put(SERVICE_SITE_NAME5, SERVICE_SITE_VAL5); - + Map<String, String> overrideConfig = new HashMap<String, String>(); - + overrideConfig.put(SERVICE_SITE_NAME2, SERVICE_SITE_VAL2_H); overrideConfig.put(SERVICE_SITE_NAME6, SERVICE_SITE_VAL6_H); - - + + Map<String, String> mergedConfig = configHelper.getMergedConfig(baseConfig, overrideConfig); - - + + Set<String> configsKeys = new HashSet<String>(); configsKeys.addAll(baseConfig.keySet()); configsKeys.addAll(overrideConfig.keySet()); - + Assert.assertEquals(configsKeys.size(), mergedConfig.size()); - + Assert.assertEquals(SERVICE_SITE_VAL1, mergedConfig.get(SERVICE_SITE_NAME1)); Assert.assertEquals(SERVICE_SITE_VAL2_H, mergedConfig.get(SERVICE_SITE_NAME2)); Assert.assertEquals(SERVICE_SITE_VAL3, mergedConfig.get(SERVICE_SITE_NAME3)); http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java index ca1a5a0..15d7904 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java @@ -84,8 +84,17 @@ import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; import org.apache.ambari.server.orm.OrmTestHelper; -import org.apache.ambari.server.orm.dao.*; -import org.apache.ambari.server.orm.entities.*; +import org.apache.ambari.server.orm.dao.ClusterDAO; +import org.apache.ambari.server.orm.dao.HostDAO; +import org.apache.ambari.server.orm.dao.RepositoryVersionDAO; +import org.apache.ambari.server.orm.dao.ResourceTypeDAO; +import org.apache.ambari.server.orm.dao.StackDAO; +import org.apache.ambari.server.orm.entities.ClusterEntity; +import org.apache.ambari.server.orm.entities.HostEntity; +import org.apache.ambari.server.orm.entities.RepositoryVersionEntity; +import org.apache.ambari.server.orm.entities.ResourceEntity; +import org.apache.ambari.server.orm.entities.ResourceTypeEntity; +import org.apache.ambari.server.orm.entities.StackEntity; import org.apache.ambari.server.serveraction.kerberos.KerberosIdentityDataFileWriter; import org.apache.ambari.server.serveraction.kerberos.KerberosIdentityDataFileWriterFactory; import org.apache.ambari.server.serveraction.kerberos.KerberosServerAction; @@ -229,7 +238,7 @@ public class TestHeartbeatHandler { hostObject.setState(HostState.UNHEALTHY); ExecutionCommand execCmd = new ExecutionCommand(); - execCmd.setCommandId("2-34"); + execCmd.setRequestAndStage(2, 34); execCmd.setHostname(DummyHostname1); aq.enqueue(DummyHostname1, new ExecutionCommand()); HeartBeat hb = new HeartBeat(); @@ -2318,11 +2327,11 @@ public class TestHeartbeatHandler { clusterEntity.setDesiredStack(stackEntity); clusterDAO.create(clusterEntity); - + StackId stackId = new StackId(DummyStackId); - + Cluster cluster = clusters.getCluster(DummyCluster); - + cluster.setDesiredStackVersion(stackId); cluster.setCurrentStackVersion(stackId); helper.getOrCreateRepositoryVersion(stackId, stackId.getStackVersion()); @@ -2520,7 +2529,7 @@ public class TestHeartbeatHandler { hostObject.setState(HostState.UNHEALTHY); ExecutionCommand execCmd = new ExecutionCommand(); - execCmd.setCommandId("2-34"); + execCmd.setRequestAndStage(2, 34); execCmd.setHostname(DummyHostname1); aq.enqueue(DummyHostname1, new ExecutionCommand()); http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py index f79fa6d..4548b8d 100644 --- a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py +++ b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py @@ -149,6 +149,7 @@ class TestStormNimbus(TestStormBase): hdp_stack_version = self.STACK_VERSION, target = RMFTestCase.TARGET_COMMON_SERVICES) + self.assertResourceCalled("Execute", "hdp-select set storm-client 2.2.1.0-2067") self.assertResourceCalled("Execute", "hdp-select set storm-nimbus 2.2.1.0-2067") def test_pre_rolling_restart_23(self): @@ -168,6 +169,7 @@ class TestStormNimbus(TestStormBase): call_mocks = [(0, None), (0, None)], mocks_dict = mocks_dict) + self.assertResourceCalled("Execute", "hdp-select set storm-client 2.3.0.0-1234") self.assertResourceCalled("Execute", "hdp-select set storm-nimbus 2.3.0.0-1234") self.assertEquals(2, mocks_dict['call'].call_count) http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py index 17c0c6b..85a4a2c 100644 --- a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py +++ b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus_prod.py @@ -114,6 +114,7 @@ class TestStormNimbus(TestStormBase): hdp_stack_version = self.STACK_VERSION, target = RMFTestCase.TARGET_COMMON_SERVICES) + self.assertResourceCalled("Execute", "hdp-select set storm-client 2.2.1.0-2067") self.assertResourceCalled("Execute", "hdp-select set storm-nimbus 2.2.1.0-2067") def test_pre_rolling_restart_23(self): @@ -133,6 +134,7 @@ class TestStormNimbus(TestStormBase): call_mocks = [(0, None), (0, None)], mocks_dict = mocks_dict) + self.assertResourceCalled("Execute", "hdp-select set storm-client 2.3.0.0-1234") self.assertResourceCalled("Execute", "hdp-select set storm-nimbus 2.3.0.0-1234") self.assertEquals(2, mocks_dict['call'].call_count) http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor.py b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor.py index 38b7b64..c4a261e 100644 --- a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor.py +++ b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor.py @@ -194,6 +194,7 @@ class TestStormSupervisor(TestStormBase): hdp_stack_version = self.STACK_VERSION, target = RMFTestCase.TARGET_COMMON_SERVICES) + self.assertResourceCalled("Execute", "hdp-select set storm-client 2.2.1.0-2067") self.assertResourceCalled("Execute", "hdp-select set storm-supervisor 2.2.1.0-2067") def test_pre_rolling_restart_23(self): @@ -213,6 +214,7 @@ class TestStormSupervisor(TestStormBase): call_mocks = [(0, None), (0, None)], mocks_dict = mocks_dict) + self.assertResourceCalled("Execute", "hdp-select set storm-client 2.3.0.0-1234") self.assertResourceCalled("Execute", "hdp-select set storm-supervisor 2.3.0.0-1234") self.assertEquals(2, mocks_dict['call'].call_count) http://git-wip-us.apache.org/repos/asf/ambari/blob/ad2f5442/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py index a6dc423..926e57e 100644 --- a/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py +++ b/ambari-server/src/test/python/stacks/2.1/STORM/test_storm_supervisor_prod.py @@ -163,6 +163,7 @@ class TestStormSupervisor(TestStormBase): hdp_stack_version = self.STACK_VERSION, target = RMFTestCase.TARGET_COMMON_SERVICES) + self.assertResourceCalled("Execute", "hdp-select set storm-client 2.2.1.0-2067") self.assertResourceCalled("Execute", "hdp-select set storm-supervisor 2.2.1.0-2067") def test_pre_rolling_restart_23(self): @@ -182,6 +183,7 @@ class TestStormSupervisor(TestStormBase): call_mocks = [(0, None), (0, None)], mocks_dict = mocks_dict) + self.assertResourceCalled("Execute", "hdp-select set storm-client 2.3.0.0-1234") self.assertResourceCalled("Execute", "hdp-select set storm-supervisor 2.3.0.0-1234") self.assertEquals(2, mocks_dict['call'].call_count)
