AMBARI-8836 - Upgrade pack for Hive (jonathanhurley)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/181f3ab4 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/181f3ab4 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/181f3ab4 Branch: refs/heads/trunk Commit: 181f3ab4dea07151aaded509ad55b55d0919f60b Parents: fad5674 Author: Jonathan Hurley <[email protected]> Authored: Fri Dec 19 14:58:23 2014 -0500 Committer: Jonathan Hurley <[email protected]> Committed: Fri Dec 19 16:52:41 2014 -0500 ---------------------------------------------------------------------- .../serveraction/upgrades/ConfigureAction.java | 75 +- .../ambari/server/state/ConfigHelper.java | 241 +++--- .../server/upgrade/UpgradeCatalog170.java | 63 +- .../package/scripts/hive_metastore.py | 39 +- .../0.12.0.2.0/package/scripts/hive_server.py | 28 +- .../package/scripts/hive_server_upgrade.py | 85 ++ .../0.12.0.2.0/package/scripts/hive_service.py | 34 +- .../HIVE/0.12.0.2.0/package/scripts/params.py | 15 +- .../package/scripts/webhcat_server.py | 15 +- .../stacks/HDP/2.2/upgrades/upgrade-2.2.xml | 51 +- .../stacks/2.0.6/HIVE/test_hive_server.py | 63 +- .../python/stacks/2.2/configs/hive-upgrade.json | 845 +++++++++++++++++++ 12 files changed, 1370 insertions(+), 184 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/181f3ab4/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ConfigureAction.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ConfigureAction.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ConfigureAction.java index 4474d05..549f9fa 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ConfigureAction.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ConfigureAction.java @@ -17,23 +17,94 @@ */ package org.apache.ambari.server.serveraction.upgrades; +import java.text.MessageFormat; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ConcurrentMap; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.agent.CommandReport; +import org.apache.ambari.server.controller.AmbariManagementController; +import org.apache.ambari.server.controller.ConfigurationRequest; import org.apache.ambari.server.serveraction.AbstractServerAction; +import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.Config; +import org.apache.ambari.server.state.ConfigHelper; +import org.apache.ambari.server.state.DesiredConfig; + +import com.google.inject.Inject; /** * Action that represents a manual stage. */ public class ConfigureAction extends AbstractServerAction { + /** + * Used to lookup the cluster. + */ + @Inject + private Clusters m_clusters; + + /** + * Used to update the configuration properties. + */ + @Inject + private AmbariManagementController m_controller; + + /** + * Used to assist in the creation of a {@link ConfigurationRequest} to update + * configuration values. + */ + @Inject + private ConfigHelper m_configHelper; + + /** + * {@inheritDoc} + */ @Override public CommandReport execute( ConcurrentMap<String, Object> requestSharedDataContext) throws AmbariException, InterruptedException { - // TODO Auto-generated method stub - return createCommandReport(0, HostRoleStatus.COMPLETED, "{}", "", ""); + + Map<String,String> commandParameters = getCommandParameters(); + if( null == commandParameters || commandParameters.isEmpty() ){ + return createCommandReport(0, HostRoleStatus.FAILED, "{}", "", + "Unable to change configuration values without command parameters"); + } + + String clusterName = commandParameters.get("clusterName"); + String key = commandParameters.get("key"); + String value = commandParameters.get("value"); + + // such as hdfs-site or hbase-env + String configType = commandParameters.get("type"); + + if (null == clusterName || null == configType || null == key) { + String message = "cluster={0}, type={1}, key={2}"; + message = MessageFormat.format(message, clusterName, configType, key); + + return createCommandReport(0, HostRoleStatus.FAILED, "{}", "", message); + } + + Cluster cluster = m_clusters.getCluster(clusterName); + Map<String, DesiredConfig> desiredConfigs = cluster.getDesiredConfigs(); + DesiredConfig desiredConfig = desiredConfigs.get(configType); + Config config = cluster.getConfig(configType, desiredConfig.getTag()); + + Map<String, String> propertiesToChange = new HashMap<String, String>(); + propertiesToChange.put(key, value); + config.updateProperties(propertiesToChange); + + String serviceVersionNote = "Stack Upgrade"; + + m_configHelper.createConfigType(cluster, m_controller, configType, + config.getProperties(), m_controller.getAuthName(), serviceVersionNote); + + String message = "Updated ''{0}'' with ''{1}={2}''"; + message = MessageFormat.format(message, configType, key, value); + + return createCommandReport(0, HostRoleStatus.COMPLETED, "{}", message, ""); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/181f3ab4/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java index 42f9601..b64e9ce 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java @@ -31,16 +31,8 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.inject.Singleton; - import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.api.services.AmbariMetaInfo; - -import com.google.inject.Inject; -import com.google.inject.persist.Transactional; - import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.AmbariManagementController; import org.apache.ambari.server.controller.ConfigurationRequest; @@ -52,6 +44,12 @@ import org.apache.ambari.server.upgrade.UpgradeCatalog170; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.google.inject.persist.Transactional; + /** * Helper class that works with config traversals. */ @@ -89,9 +87,9 @@ public class ConfigHelper { */ public Map<String, Map<String, String>> getEffectiveDesiredTags( Cluster cluster, String hostName) throws AmbariException { - + Host host = clusters.getHost(hostName); - + return getEffectiveDesiredTags(cluster, host.getDesiredHostConfigs(cluster)); } @@ -103,15 +101,15 @@ public class ConfigHelper { */ private Map<String, Map<String, String>> getEffectiveDesiredTags( Cluster cluster, Map<String, HostConfig> hostConfigOverrides) { - + Map<String, DesiredConfig> clusterDesired = cluster.getDesiredConfigs(); - + Map<String, Map<String,String>> resolved = new TreeMap<String, Map<String, String>>(); - + // Do not use host component config mappings. Instead, the rules are: // 1) Use the cluster desired config // 2) override (1) with config-group overrides - + for (Entry<String, DesiredConfig> clusterEntry : clusterDesired.entrySet()) { String type = clusterEntry.getKey(); String tag = clusterEntry.getValue().getTag(); @@ -394,7 +392,7 @@ public class ConfigHelper { public void invalidateStaleConfigsCache(ServiceComponentHost sch) { staleConfigsCache.invalidate(sch); } - + /** * Remove configs by type * @param type config Type @@ -402,15 +400,15 @@ public class ConfigHelper { @Transactional public void removeConfigsByType(Cluster cluster, String type) { Set<String> globalVersions = cluster.getConfigsByType(type).keySet(); - + for(String version:globalVersions) { ClusterConfigEntity clusterConfigEntity = clusterDAO.findConfig (cluster.getClusterId(), type, version); - + clusterDAO.removeConfig(clusterConfigEntity); } } - + /** * Gets all the config dictionary where property with the given name is present in stack definitions * @param stackId @@ -419,30 +417,30 @@ public class ConfigHelper { public Set<String> findConfigTypesByPropertyName(StackId stackId, String propertyName, String clusterName) throws AmbariException { StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion()); - + Set<String> result = new HashSet<String>(); for(Service service : clusters.getCluster(clusterName).getServices().values()) { Set<PropertyInfo> stackProperties = ambariMetaInfo.getServiceProperties(stack.getName(), stack.getVersion(), service.getName()); Set<PropertyInfo> stackLevelProperties = ambariMetaInfo.getStackProperties(stack.getName(), stack.getVersion()); stackProperties.addAll(stackLevelProperties); - + for (PropertyInfo stackProperty : stackProperties) { if(stackProperty.getName().equals(propertyName)) { String configType = fileNameToConfigType(stackProperty.getFilename()); - + result.add(configType); } } } - + return result; } - + public Set<String> getPropertyValuesWithPropertyType(StackId stackId, PropertyType propertyType, Cluster cluster) throws AmbariException { StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion()); - + Set<String> result = new HashSet<String>(); for(Service service : cluster.getServices().values()) { @@ -456,99 +454,123 @@ public class ConfigHelper { } } } - + Set<PropertyInfo> stackProperties = ambariMetaInfo.getStackProperties(stack.getName(), stack.getVersion()); - + for (PropertyInfo stackProperty : stackProperties) { if(stackProperty.getPropertyTypes().contains(propertyType)) { String stackPropertyConfigType = fileNameToConfigType(stackProperty.getFilename()); result.add(cluster.getDesiredConfigByType(stackPropertyConfigType).getProperties().get(stackProperty.getName())); } } - + return result; } - + public String getPropertyValueFromStackDefinitions(Cluster cluster, String configType, String propertyName) throws AmbariException { StackId stackId = cluster.getCurrentStackVersion(); StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion()); - + for(ServiceInfo serviceInfo:stack.getServices()) { Set<PropertyInfo> serviceProperties = ambariMetaInfo.getServiceProperties(stack.getName(), stack.getVersion(), serviceInfo.getName()); Set<PropertyInfo> stackProperties = ambariMetaInfo.getStackProperties(stack.getName(), stack.getVersion()); serviceProperties.addAll(stackProperties); - + for (PropertyInfo stackProperty : serviceProperties) { String stackPropertyConfigType = fileNameToConfigType(stackProperty.getFilename()); - + if(stackProperty.getName().equals(propertyName) && stackPropertyConfigType.equals(configType)) { return stackProperty.getValue(); } } - + } - + return null; } - + public ServiceInfo getPropertyOwnerService(Cluster cluster, String configType, String propertyName) throws AmbariException { StackId stackId = cluster.getCurrentStackVersion(); StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion()); - - for(ServiceInfo serviceInfo:stack.getServices()) { + + for(ServiceInfo serviceInfo:stack.getServices()) { Set<PropertyInfo> serviceProperties = ambariMetaInfo.getServiceProperties(stack.getName(), stack.getVersion(), serviceInfo.getName()); - + for (PropertyInfo stackProperty : serviceProperties) { String stackPropertyConfigType = fileNameToConfigType(stackProperty.getFilename()); - + if(stackProperty.getName().equals(propertyName) && stackPropertyConfigType.equals(configType)) { return serviceInfo; } } - + } - + return null; } - + public Set<PropertyInfo> getServiceProperties(Cluster cluster, String serviceName) throws AmbariException { StackId stackId = cluster.getCurrentStackVersion(); StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion()); - + return ambariMetaInfo.getServiceProperties(stack.getName(), stack.getVersion(), serviceName); } - + public Set<PropertyInfo> getStackProperties(Cluster cluster) throws AmbariException { StackId stackId = cluster.getCurrentStackVersion(); StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion()); - + return ambariMetaInfo.getStackProperties(stack.getName(), stack.getVersion()); } - - public void createConfigType(Cluster cluster, AmbariManagementController ambariManagementController, - String configType, Map<String, String> properties, String authName) throws AmbariException { - String tag; - if(cluster.getConfigsByType(configType) == null) { - tag = "version1"; - } else { + + /** + * A helper method to create a new {@link Config} for a given configuration + * type. This method will perform the following tasks: + * <ul> + * <li>Create a {@link Config} in the cluster for the specified type. This + * will have the proper versions and tags set automatically.</li> + * <li>Set the cluster's {@link DesiredConfig} to the new configuration</li> + * <li>Create an entry in the configuration history with a note and username.</li> + * <ul> + * + * @param cluster + * @param controller + * @param configType + * @param properties + * @param authenticatedUserName + * @param serviceVersionNote + * @throws AmbariException + */ + public void createConfigType(Cluster cluster, + AmbariManagementController controller, String configType, + Map<String, String> properties, String authenticatedUserName, + String serviceVersionNote) throws AmbariException { + + String tag = "version1"; + if (cluster.getConfigsByType(configType) != null) { tag = "version" + System.currentTimeMillis(); } - - ConfigurationRequest cr = new ConfigurationRequest(); - cr.setClusterName(cluster.getClusterName()); - cr.setVersionTag(tag); - cr.setType(configType); - cr.setProperties(properties); - ambariManagementController.createConfiguration(cr); - - Config baseConfig = cluster.getConfig(cr.getType(), cr.getVersionTag()); - + + // update the configuration + ConfigurationRequest configurationRequest = new ConfigurationRequest(); + configurationRequest.setClusterName(cluster.getClusterName()); + configurationRequest.setVersionTag(tag); + configurationRequest.setType(configType); + configurationRequest.setProperties(properties); + configurationRequest.setServiceConfigVersionNote(serviceVersionNote); + controller.createConfiguration(configurationRequest); + + // create the configuration history entry + Config baseConfig = cluster.getConfig(configurationRequest.getType(), + configurationRequest.getVersionTag()); + if (baseConfig != null) { - cluster.addDesiredConfig(authName, Collections.singleton(baseConfig)); + cluster.addDesiredConfig(authenticatedUserName, + Collections.singleton(baseConfig), serviceVersionNote); } } - + /** * Since global configs are deprecated since 1.7.0, but still supported. * We should automatically map any globals used, to *-env dictionaries. @@ -557,57 +579,58 @@ public class ConfigHelper { */ public void moveDeprecatedGlobals(StackId stackId, Map<String, Map<String, String>> configurations, String clusterName) { Map<String, String> globalConfigurations = new HashMap<String, String>(); - + if(configurations.get(Configuration.GLOBAL_CONFIG_TAG) == null || - configurations.get(Configuration.GLOBAL_CONFIG_TAG).size() == 0) + configurations.get(Configuration.GLOBAL_CONFIG_TAG).size() == 0) { return; - + } + globalConfigurations.putAll(configurations.get(Configuration.GLOBAL_CONFIG_TAG)); - + if(globalConfigurations!=null && globalConfigurations.size() != 0) { LOG.warn("Global configurations are deprecated, " + "please use *-env"); } - + for(Map.Entry<String, String> property:globalConfigurations.entrySet()) { String propertyName = property.getKey(); String propertyValue = property.getValue(); - + Set<String> newConfigTypes = null; try{ - newConfigTypes = this.findConfigTypesByPropertyName(stackId, propertyName, clusterName); + newConfigTypes = findConfigTypesByPropertyName(stackId, propertyName, clusterName); } catch(AmbariException e) { LOG.error("Exception while getting configurations from the stacks", e); return; } - + newConfigTypes.remove(Configuration.GLOBAL_CONFIG_TAG); - + String newConfigType = null; if(newConfigTypes.size() > 0) { newConfigType = newConfigTypes.iterator().next(); } else { newConfigType = UpgradeCatalog170.getAdditionalMappingGlobalToEnv().get(propertyName); } - + if(newConfigType==null) { LOG.warn("Cannot find where to map " + propertyName + " from " + Configuration.GLOBAL_CONFIG_TAG + " (value="+propertyValue+")"); continue; } - - LOG.info("Mapping config " + propertyName + " from " + Configuration.GLOBAL_CONFIG_TAG + + + LOG.info("Mapping config " + propertyName + " from " + Configuration.GLOBAL_CONFIG_TAG + " to " + newConfigType + " (value="+propertyValue+")"); - + configurations.get(Configuration.GLOBAL_CONFIG_TAG).remove(propertyName); - + if(!configurations.containsKey(newConfigType)) { configurations.put(newConfigType, new HashMap<String, String>()); } configurations.get(newConfigType).put(propertyName, propertyValue); } - + if(configurations.get(Configuration.GLOBAL_CONFIG_TAG).size() == 0) { configurations.remove(Configuration.GLOBAL_CONFIG_TAG); } @@ -620,15 +643,16 @@ public class ConfigHelper { } Map <String, HostConfig> actual = sch.getActualConfigs(); - if (null == actual || actual.isEmpty()) + if (null == actual || actual.isEmpty()) { return false; + } Cluster cluster = clusters.getClusterById(sch.getClusterId()); StackId stackId = cluster.getDesiredStackVersion(); - + Map<String, Map<String, String>> desired = getEffectiveDesiredTags(cluster, sch.getHostName()); - + ServiceInfo serviceInfo = ambariMetaInfo.getService(stackId.getStackName(), stackId.getStackVersion(), sch.getServiceName()); ComponentInfo componentInfo = serviceInfo.getComponentByName(sch.getServiceComponentName()); @@ -643,13 +667,13 @@ public class ConfigHelper { boolean stale = false; Iterator<Entry<String, Map<String, String>>> it = desired.entrySet().iterator(); - + while (it.hasNext() && !stale) { Entry<String, Map<String, String>> desiredEntry = it.next(); - + String type = desiredEntry.getKey(); Map<String, String> tags = desiredEntry.getValue(); - + if (!actual.containsKey(type)) { // desired is set, but actual is not if (!serviceInfo.hasConfigDependency(type)) { @@ -658,7 +682,7 @@ public class ConfigHelper { // find out if the keys are stale by first checking the target service, // then all services Collection<String> keys = mergeKeyNames(cluster, type, tags.values()); - + if (serviceInfo.hasDependencyAndPropertyFor(type, keys) || !hasPropertyFor(stackId, type, keys)) { stale = true; } @@ -723,51 +747,55 @@ public class ConfigHelper { for (ServiceInfo svc : ambariMetaInfo.getServices(stack.getStackName(), stack.getStackVersion()).values()) { - - if (svc.hasDependencyAndPropertyFor(type, keys)) + + if (svc.hasDependencyAndPropertyFor(type, keys)) { return true; - + } + } - + return false; } - + /** * @return the keys that have changed values */ private Collection<String> findChangedKeys(Cluster cluster, String type, Collection<String> desiredTags, Collection<String> actualTags) { - + Map<String, String> desiredValues = new HashMap<String, String>(); Map<String, String> actualValues = new HashMap<String, String>(); - + for (String tag : desiredTags) { Config config = cluster.getConfig(type, tag); - if (null != config) + if (null != config) { desiredValues.putAll(config.getProperties()); + } } - + for (String tag : actualTags) { Config config = cluster.getConfig(type, tag); - if (null != config) + if (null != config) { actualValues.putAll(config.getProperties()); + } } - + List<String> keys = new ArrayList<String>(); - + for (Entry<String, String> entry : desiredValues.entrySet()) { String key = entry.getKey(); String value = entry.getValue(); - - if (!actualValues.containsKey(key)) + + if (!actualValues.containsKey(key)) { keys.add(key); - else if (!actualValues.get(key).equals(value)) + } else if (!actualValues.get(key).equals(value)) { keys.add(key); + } } - + return keys; } - + /** * @return the map of tags for a desired config */ @@ -781,13 +809,14 @@ public class ConfigHelper { } return map; } - + /** * @return true if the tags are different in any way, even if not-specified */ private boolean isTagChanged(Map<String, String> desiredTags, Map<String, String> actualTags, boolean groupSpecificConfigs) { - if (!actualTags.get(CLUSTER_DEFAULT_TAG).equals(desiredTags.get(CLUSTER_DEFAULT_TAG)) && !groupSpecificConfigs) + if (!actualTags.get(CLUSTER_DEFAULT_TAG).equals(desiredTags.get(CLUSTER_DEFAULT_TAG)) && !groupSpecificConfigs) { return true; + } // if the host has group specific configs for type we should ignore the cluster level configs and compare specifics if (groupSpecificConfigs) { @@ -807,14 +836,14 @@ public class ConfigHelper { */ private Collection<String> mergeKeyNames(Cluster cluster, String type, Collection<String> tags) { Set<String> names = new HashSet<String>(); - + for (String tag : tags) { Config config = cluster.getConfig(type, tag); if (null != config) { names.addAll(config.getProperties().keySet()); } } - + return names; } http://git-wip-us.apache.org/repos/asf/ambari/blob/181f3ab4/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog170.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog170.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog170.java index bb151b9..a7736dd 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog170.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog170.java @@ -18,9 +18,30 @@ package org.apache.ambari.server.upgrade; -import com.google.common.reflect.TypeToken; -import com.google.inject.Inject; -import com.google.inject.Injector; +import java.lang.reflect.Type; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import javax.persistence.EntityManager; +import javax.persistence.TypedQuery; +import javax.persistence.criteria.CriteriaBuilder; +import javax.persistence.criteria.CriteriaQuery; +import javax.persistence.criteria.Expression; +import javax.persistence.criteria.Predicate; +import javax.persistence.criteria.Root; + import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.AmbariManagementController; @@ -70,35 +91,16 @@ import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Config; import org.apache.ambari.server.state.ConfigHelper; -import org.apache.ambari.server.state.alert.Scope; import org.apache.ambari.server.state.State; +import org.apache.ambari.server.state.alert.Scope; import org.apache.ambari.server.utils.StageUtils; import org.apache.ambari.server.view.ViewRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.persistence.EntityManager; -import javax.persistence.TypedQuery; -import javax.persistence.criteria.CriteriaBuilder; -import javax.persistence.criteria.CriteriaQuery; -import javax.persistence.criteria.Expression; -import javax.persistence.criteria.Predicate; -import javax.persistence.criteria.Root; -import java.lang.reflect.Type; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; +import com.google.common.reflect.TypeToken; +import com.google.inject.Inject; +import com.google.inject.Injector; /** * Upgrade catalog for version 1.7.0. @@ -1110,11 +1112,12 @@ public class UpgradeCatalog170 extends AbstractUpgradeCatalog { Config oldConfig = cluster.getDesiredConfigByType(PIG_PROPERTIES_CONFIG_TYPE); if (oldConfig != null) { Map<String, String> properties = oldConfig.getProperties(); - + if(!properties.containsKey(CONTENT_FIELD_NAME)) { String value = properties.remove(PIG_CONTENT_FIELD_NAME); properties.put(CONTENT_FIELD_NAME, value); - configHelper.createConfigType(cluster, ambariManagementController, PIG_PROPERTIES_CONFIG_TYPE, properties, "ambari-upgrade"); + configHelper.createConfigType(cluster, ambariManagementController, + PIG_PROPERTIES_CONFIG_TYPE, properties, "ambari-upgrade", null); } } } @@ -1212,7 +1215,9 @@ public class UpgradeCatalog170 extends AbstractUpgradeCatalog { // if have some custom properties, for own services etc., leave that as it was if(unmappedGlobalProperties.size() != 0) { LOG.info("Not deleting globals because have custom properties"); - configHelper.createConfigType(cluster, ambariManagementController, Configuration.GLOBAL_CONFIG_TAG, unmappedGlobalProperties, "ambari-upgrade"); + configHelper.createConfigType(cluster, ambariManagementController, + Configuration.GLOBAL_CONFIG_TAG, unmappedGlobalProperties, + "ambari-upgrade", null); } else { configHelper.removeConfigsByType(cluster, Configuration.GLOBAL_CONFIG_TAG); } http://git-wip-us.apache.org/repos/asf/ambari/blob/181f3ab4/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_metastore.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_metastore.py b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_metastore.py index dc02a7d..84a76ea 100644 --- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_metastore.py +++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_metastore.py @@ -23,42 +23,55 @@ from resource_management import * from hive import hive from hive_service import hive_service -from mysql_service import mysql_service -class HiveMetastore(Script): +class HiveMetastore(Script): def install(self, env): import params - self.install_packages(env, exclude_packages=params.hive_exclude_packages) + + self.install_packages(env, exclude_packages = params.hive_exclude_packages) + def configure(self, env): import params + env.set_params(params) - hive(name='metastore') + hive(name = 'metastore') + - def start(self, env, rolling_restart=False): + def start(self, env, rolling_restart = False): import params + env.set_params(params) - self.configure(env) # FOR SECURITY - hive_service( 'metastore', - action = 'start' - ) + self.configure(env) # FOR SECURITY + hive_service('metastore', action = 'start') + - def stop(self, env, rolling_restart=False): + def stop(self, env, rolling_restart = False): import params + env.set_params(params) + hive_service('metastore', action = 'stop' ) - hive_service( 'metastore', - action = 'stop' - ) def status(self, env): import status_params + env.set_params(status_params) pid_file = format("{hive_pid_dir}/{hive_metastore_pid}") # Recursively check all existing gmetad pid files check_process_status(pid_file) + + def pre_rolling_restart(self, env): + Logger.info("Executing Metastore Rolling Upgrade pre-restart") + import params + env.set_params(params) + + if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0: + Execute(format("hdp-select set hive-metastore {version}")) + + if __name__ == "__main__": HiveMetastore().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/181f3ab4/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server.py b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server.py index fa8ece4..12efae8 100644 --- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server.py +++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server.py @@ -17,6 +17,7 @@ See the License for the specific language governing permissions and limitations under the License. """ +import hive_server_upgrade from resource_management import * from hive import hive @@ -30,13 +31,16 @@ class HiveServer(Script): import params self.install_packages(env, exclude_packages=params.hive_exclude_packages) + def configure(self, env): import params env.set_params(params) if not (params.hdp_stack_version != "" and compare_versions(params.hdp_stack_version, '2.2') >=0): install_tez_jars() + hive(name='hiveserver2') + def start(self, env, rolling_restart=False): import params env.set_params(params) @@ -46,25 +50,37 @@ class HiveServer(Script): copy_tarballs_to_hdfs('mapreduce', params.tez_user, params.hdfs_user, params.user_group) copy_tarballs_to_hdfs('tez', params.tez_user, params.hdfs_user, params.user_group) - hive_service( 'hiveserver2', - action = 'start' - ) + hive_service( 'hiveserver2', action = 'start', + rolling_restart=rolling_restart ) + def stop(self, env, rolling_restart=False): import params env.set_params(params) - hive_service( 'hiveserver2', - action = 'stop' - ) + if rolling_restart: + hive_server_upgrade.pre_upgrade_deregister() + else: + hive_service( 'hiveserver2', action = 'stop' ) + def status(self, env): import status_params env.set_params(status_params) pid_file = format("{hive_pid_dir}/{hive_pid}") + # Recursively check all existing gmetad pid files check_process_status(pid_file) + def pre_rolling_restart(self, env): + Logger.info("Executing HiveServer2 Rolling Upgrade pre-restart") + import params + env.set_params(params) + + if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0: + Execute(format("hdp-select set hive-server2 {version}")) + + if __name__ == "__main__": HiveServer().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/181f3ab4/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server_upgrade.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server_upgrade.py b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server_upgrade.py new file mode 100644 index 0000000..653d4bd --- /dev/null +++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_server_upgrade.py @@ -0,0 +1,85 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import re +from resource_management.core.logger import Logger +from resource_management.core.exceptions import Fail +from resource_management.core.resources.system import Execute +from resource_management.core.shell import call +from resource_management.libraries.functions import format + + +def pre_upgrade_deregister(): + """ + Runs the "hive --service hiveserver2 --deregister <version>" command to + de-provision the server in preparation for an upgrade. This will contact + ZooKeeper to remove the server so that clients that attempt to connect + will be directed to other servers automatically. Once all + clients have drained, the server will shutdown automatically; this process + could take a very long time. + This function will obtain the Kerberos ticket if security is enabled. + :return: + """ + import params + + Logger.info('HiveServer2 executing "deregister" command in preparation for upgrade...') + + if params.security_enabled: + kinit_command=format("{kinit_path_local} -kt {smoke_user_keytab} {smokeuser}; ") + Execute(kinit_command,user=params.smokeuser) + + # calculate the current hive server version + current_hiveserver_version = _get_current_hiveserver_version() + if current_hiveserver_version is None: + raise Fail('Unable to determine the current HiveServer2 version to deregister.') + + # deregister + command = 'hive --service hiveserver2 --deregister ' + current_hiveserver_version + Execute(command, user=params.hive_user, path=params.execute_path, tries=1 ) + + +def _get_current_hiveserver_version(): + """ + Runs an "hdp-select status hive-server2" check and parses the result in order + to obtain the current version of hive. + + :return: the hiveserver2 version, such as "hdp-select status hive-server2" + """ + import params + + try: + command = 'hdp-select status hive-server2' + return_code, hdp_output = call(command, user=params.hive_user) + except Exception, e: + Logger.error(str(e)) + raise Fail('Unable to execute hdp-select command to retrieve the hiveserver2 version.') + + if return_code != 0: + raise Fail('Unable to determine the current HiveServer2 version because of a non-zero return code of {0}'.format(str(return_code))) + + # strip "hive-server2 - " off of result and test the version + current_hive_server_version = re.sub('hive-server2 - ', '', hdp_output) + match = re.match('[0-9]+.[0-9]+.[0-9]+.[0-9]+-[0-9]+', current_hive_server_version) + + if match: + return current_hive_server_version + else: + raise Fail('The extracted hiveserver2 version "{0}" does not matching any known pattern'.format(current_hive_server_version)) + + http://git-wip-us.apache.org/repos/asf/ambari/blob/181f3ab4/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_service.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_service.py b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_service.py index 8e5d878..3918a74 100644 --- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_service.py +++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/hive_service.py @@ -23,9 +23,7 @@ import sys import time from resource_management.core import shell -def hive_service( - name, - action='start'): +def hive_service(name, action='start', rolling_restart=False): import params @@ -38,20 +36,23 @@ def hive_service( cmd = format( "env JAVA_HOME={java64_home} {start_hiveserver2_path} {hive_log_dir}/hive-server2.out {hive_log_dir}/hive-server2.log {pid_file} {hive_server_conf_dir} {hive_log_dir}") - process_id_exists = format("ls {pid_file} >/dev/null 2>&1 && ps -p `cat {pid_file}` >/dev/null 2>&1") - + process_id_exists_command = format("ls {pid_file} >/dev/null 2>&1 && ps -p `cat {pid_file}` >/dev/null 2>&1") + if action == 'start': if name == 'hiveserver2': check_fs_root() demon_cmd = format("{cmd}") - - Execute(demon_cmd, - user=params.hive_user, - environment={'HADOOP_HOME': params.hadoop_home}, - path=params.execute_path, - not_if=process_id_exists - ) + + # upgrading hiveserver2 (rolling_restart) means that there is an existing, + # de-registering hiveserver2; the pid will still exist, but the new + # hiveserver is spinning up on a new port, so the pid will be re-written + if rolling_restart: + process_id_exists_command = None + + Execute(demon_cmd, user=params.hive_user, + environment={'HADOOP_HOME': params.hadoop_home}, path=params.execute_path, + not_if=process_id_exists_command ) if params.hive_jdbc_driver == "com.mysql.jdbc.Driver" or \ params.hive_jdbc_driver == "org.postgresql.Driver" or \ @@ -96,12 +97,9 @@ def hive_service( elif action == 'stop': demon_cmd = format("sudo kill `cat {pid_file}`") - Execute(demon_cmd, - not_if = format("! ({process_id_exists})") - ) - File(pid_file, - action = "delete", - ) + Execute(demon_cmd, not_if = format("! ({process_id_exists_command})")) + + File(pid_file, action = "delete",) def check_fs_root(): import params http://git-wip-us.apache.org/repos/asf/ambari/blob/181f3ab4/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/params.py index 4e58de0..774e811 100644 --- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/params.py +++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/params.py @@ -38,14 +38,27 @@ version = default("/commandParams/version", None) # Hadoop params # TODO, this logic should initialize these parameters in a file inside the HDP 2.2 stack. if hdp_stack_version != "" and compare_versions(hdp_stack_version, '2.2') >=0: + # start out with client libraries hadoop_bin_dir = "/usr/hdp/current/hadoop-client/bin" hadoop_home = '/usr/hdp/current/hadoop-client' hive_bin = '/usr/hdp/current/hive-client/bin' hive_lib = '/usr/hdp/current/hive-client/lib' + # if this is a server action, then use the server binaries; smoke tests + # use the client binaries + command_role = default("/role", "") + server_role_dir_mapping = { 'HIVE_SERVER' : 'hive-server2', + 'HIVE_METASTORE' : 'hive-metastore' } + + if command_role in server_role_dir_mapping: + hive_server_root = server_role_dir_mapping[command_role] + hive_bin = format('/usr/hdp/current/{hive_server_root}/bin') + hive_lib = format('/usr/hdp/current/{hive_server_root}/lib') + + # there are no client versions of these, use server versions directly hcat_lib = '/usr/hdp/current/hive-webhcat/share/hcatalog' webhcat_bin_dir = '/usr/hdp/current/hive-webhcat/sbin' - + hive_specific_configs_supported = True else: hadoop_bin_dir = "/usr/bin" http://git-wip-us.apache.org/repos/asf/ambari/blob/181f3ab4/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/webhcat_server.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/webhcat_server.py b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/webhcat_server.py index a8b3a8f..f1f9f37 100644 --- a/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/webhcat_server.py +++ b/ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/webhcat_server.py @@ -18,7 +18,6 @@ limitations under the License. Ambari Agent """ -import sys from resource_management import * from webhcat import webhcat @@ -27,27 +26,41 @@ from webhcat_service import webhcat_service class WebHCatServer(Script): def install(self, env): self.install_packages(env) + + def configure(self, env): import params env.set_params(params) webhcat() + def start(self, env, rolling_restart=False): import params env.set_params(params) self.configure(env) # FOR SECURITY webhcat_service(action = 'start') + def stop(self, env, rolling_restart=False): import params env.set_params(params) webhcat_service(action = 'stop') + def status(self, env): import status_params env.set_params(status_params) check_process_status(status_params.webhcat_pid_file) + + def pre_rolling_restart(self, env): + Logger.info("Executing WebHCat Rolling Upgrade pre-restart") + import params + env.set_params(params) + + if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0: + Execute(format("hdp-select set hive-webhcat {version}")) + if __name__ == "__main__": WebHCatServer().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/181f3ab4/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml index dbb41e0..8d8fe00 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml +++ b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml @@ -21,7 +21,6 @@ <target>2.2.*.*</target> <order> - <group name="ZOOKEEPER" title="Zookeeper"> <service name="ZOOKEEPER"> <component>ZOOKEEPER_SERVER</component> @@ -68,6 +67,14 @@ </batch> </group> + <group name="HIVE" title="Hive"> + <service name="HIVE"> + <component>HIVE_METASTORE</component> + <component>HIVE_SERVER</component> + <component>WEBHCAT_SERVER</component> + </service> + </group> + <group name="CLIENTS" title="Client Components"> <service name="HDFS"> <component>HDFS_CLIENT</component> @@ -118,7 +125,7 @@ </task> </execute-stage> </group> - + </order> @@ -312,8 +319,46 @@ </upgrade> </component> </service> - + <service name="HIVE"> + <component name="HIVE_METASTORE"> + <pre-upgrade> + <task xsi:type="manual"> + <message>Backup the Hive Metastore database.</message> + </task> + <task xsi:type="manual"> + <message>Run the SQL file at /usr/hdp/$version/hive/scripts/metastore/upgrade to update the Hive Metastore schema.</message> + </task> + </pre-upgrade> + <upgrade> + <task xsi:type="restart" /> + </upgrade> + </component> + + <component name="HIVE_SERVER"> + <pre-upgrade> + <task xsi:type="manual"> + <message>The HiveServer port will now change to 10010. Ensure that this port is available on each HiveServer instance.</message> + </task> + + <task xsi:type="configure"> + <type>hive-site</type> + <key>hive.server2.thrift.port</key> + <value>10010</value> + </task> + </pre-upgrade> + + <upgrade> + <task xsi:type="restart" /> + </upgrade> + </component> + + <component name="WEBHCAT_SERVER"> + <upgrade> + <task xsi:type="restart" /> + </upgrade> + </component> + <component name="HIVE_CLIENT"> <upgrade> <task xsi:type="restart" /> http://git-wip-us.apache.org/repos/asf/ambari/blob/181f3ab4/ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py b/ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py index ee7d8af..d00c3b5 100644 --- a/ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py +++ b/ambari-server/src/test/python/stacks/2.0.6/HIVE/test_hive_server.py @@ -17,18 +17,17 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ''' -import os +import socket import subprocess -from mock.mock import MagicMock, call, patch + +from mock.mock import MagicMock, patch from resource_management.core import shell -from resource_management.libraries.functions import hive_check from stacks.utils.RMFTestCase import * -import socket - class TestHiveServer(RMFTestCase): COMMON_SERVICES_PACKAGE_DIR = "HIVE/0.12.0.2.0/package" STACK_VERSION = "2.0.6" + UPGRADE_STACK_VERSION = "2.2" def test_configure_default(self): self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/hive_server.py", @@ -545,3 +544,57 @@ class TestHiveServer(RMFTestCase): self.assert_configure_default() self.assertFalse(socket_mock.called) self.assertFalse(s.close.called) + + + @patch("hive_server.HiveServer.pre_rolling_restart") + @patch("hive_server.HiveServer.start") + @patch("subprocess.Popen") + def test_stop_during_upgrade(self, process_mock, hive_server_start_mock, + hive_server_pre_rolling_mock): + + process_output = 'hive-server2 - 2.2.0.0-2041' + + process = MagicMock() + process.communicate.return_value = [process_output] + process.returncode = 0 + process_mock.return_value = process + + self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/hive_server.py", + classname = "HiveServer", command = "restart", config_file = "hive-upgrade.json", + hdp_stack_version = self.UPGRADE_STACK_VERSION, + target = RMFTestCase.TARGET_COMMON_SERVICES ) + + self.assertTrue(process_mock.called) + self.assertEqual(process_mock.call_count,2) + + self.assertResourceCalled('Execute', 'hive --service hiveserver2 --deregister 2.2.0.0-2041', + path=['/bin:/usr/hdp/current/hive-server2/bin:/usr/hdp/current/hadoop-client/bin'], + tries=1, user='hive') + + self.assertResourceCalled('Execute', 'hdp-select set hive-server2 2.2.1.0-2065',) + + + @patch("hive_server.HiveServer.pre_rolling_restart") + @patch("hive_server.HiveServer.start") + @patch("subprocess.Popen") + def test_stop_during_upgrade_bad_hive_version(self, process_mock, hive_server_start_mock, + hive_server_pre_rolling_mock): + + process_output = 'BAD VERSION' + + process = MagicMock() + process.communicate.return_value = [process_output] + process.returncode = 0 + process_mock.return_value = process + + try: + self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/hive_server.py", + classname = "HiveServer", command = "restart", config_file = "hive-upgrade.json", + hdp_stack_version = self.UPGRADE_STACK_VERSION, + target = RMFTestCase.TARGET_COMMON_SERVICES ) + + self.fail("Invalid hive version should have caused an exception") + except: + pass + + self.assertNoMoreResources()
