Repository: ambari Updated Branches: refs/heads/branch-feature-AMBARI-21348 98f77089c -> 6bab5a537
AMBARI-21361 - Finalization Can Fail When Host Versions Changed on Stack Distribution (jonathanhurley) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6bab5a53 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6bab5a53 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6bab5a53 Branch: refs/heads/branch-feature-AMBARI-21348 Commit: 6bab5a53789d0c50cc75b1e1397cabcc0630f385 Parents: 98f7708 Author: Jonathan Hurley <jhur...@hortonworks.com> Authored: Thu Jun 29 09:26:33 2017 -0400 Committer: Jonathan Hurley <jhur...@hortonworks.com> Committed: Thu Jun 29 16:53:51 2017 -0400 ---------------------------------------------------------------------- .../actionmanager/ExecutionCommandWrapper.java | 32 ++- .../AmbariCustomCommandExecutionHelper.java | 4 - .../internal/UpgradeResourceProvider.java | 23 -- .../listeners/upgrade/StackVersionListener.java | 227 +++++++++++-------- 4 files changed, 159 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/6bab5a53/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java index 4773c75..fc66f53 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java @@ -17,6 +17,9 @@ */ package org.apache.ambari.server.actionmanager; +import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.HOOKS_FOLDER; +import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.SERVICE_PACKAGE_FOLDER; + import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -27,12 +30,17 @@ import org.apache.ambari.server.ClusterNotFoundException; import org.apache.ambari.server.agent.AgentCommand.AgentCommandType; import org.apache.ambari.server.agent.ExecutionCommand; import org.apache.ambari.server.agent.ExecutionCommand.KeyNames; +import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.orm.dao.HostRoleCommandDAO; import org.apache.ambari.server.orm.entities.ClusterVersionEntity; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.ConfigHelper; import org.apache.ambari.server.state.DesiredConfig; +import org.apache.ambari.server.state.ServiceInfo; +import org.apache.ambari.server.state.StackId; +import org.apache.ambari.server.state.StackInfo; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +67,12 @@ public class ExecutionCommandWrapper { @Inject private Gson gson; + /** + * Used for injecting hooks and common-services into the command. + */ + @Inject + private AmbariMetaInfo ambariMetaInfo; + @AssistedInject public ExecutionCommandWrapper(@Assisted String jsonExecutionCommand) { this.jsonExecutionCommand = jsonExecutionCommand; @@ -182,12 +196,28 @@ public class ExecutionCommandWrapper { } } + Map<String,String> commandParams = executionCommand.getCommandParams(); + ClusterVersionEntity effectiveClusterVersion = cluster.getEffectiveClusterVersion(); if (null != effectiveClusterVersion) { - executionCommand.getCommandParams().put(KeyNames.VERSION, + commandParams.put(KeyNames.VERSION, effectiveClusterVersion.getRepositoryVersion().getVersion()); } + // add the stack and common-services folders to the command + StackId stackId = cluster.getDesiredStackVersion(); + StackInfo stackInfo = ambariMetaInfo.getStack(stackId.getStackName(), + stackId.getStackVersion()); + + commandParams.put(HOOKS_FOLDER, stackInfo.getStackHooksFolder()); + + String serviceName = executionCommand.getServiceName(); + if (!StringUtils.isEmpty(serviceName)) { + ServiceInfo serviceInfo = ambariMetaInfo.getService(stackId.getStackName(), + stackId.getStackVersion(), serviceName); + + commandParams.put(SERVICE_PACKAGE_FOLDER, serviceInfo.getServicePackageFolder()); + } } catch (ClusterNotFoundException cnfe) { // it's possible that there are commands without clusters; in such cases, // just return the de-serialized command and don't try to read configs http://git-wip-us.apache.org/repos/asf/ambari/blob/6bab5a53/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java index 3a672b6..6f92707 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java @@ -329,8 +329,6 @@ public class AmbariCustomCommandExecutionHelper { AmbariMetaInfo ambariMetaInfo = managementController.getAmbariMetaInfo(); ServiceInfo serviceInfo = ambariMetaInfo.getService( stackId.getStackName(), stackId.getStackVersion(), serviceName); - StackInfo stackInfo = ambariMetaInfo.getStack - (stackId.getStackName(), stackId.getStackVersion()); CustomCommandDefinition customCommandDefinition = null; ComponentInfo ci = serviceInfo.getComponentByName(componentName); @@ -474,8 +472,6 @@ public class AmbariCustomCommandExecutionHelper { } commandParams.put(COMMAND_TIMEOUT, "" + commandTimeout); - commandParams.put(SERVICE_PACKAGE_FOLDER, serviceInfo.getServicePackageFolder()); - commandParams.put(HOOKS_FOLDER, stackInfo.getStackHooksFolder()); Map<String, String> roleParams = execCmd.getRoleParams(); if (roleParams == null) { http://git-wip-us.apache.org/repos/asf/ambari/blob/6bab5a53/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java index d2573e1..822f94d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java @@ -17,9 +17,6 @@ */ package org.apache.ambari.server.controller.internal; -import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.HOOKS_FOLDER; -import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.SERVICE_PACKAGE_FOLDER; - import java.text.MessageFormat; import java.util.ArrayList; import java.util.Arrays; @@ -95,7 +92,6 @@ import org.apache.ambari.server.state.Service; import org.apache.ambari.server.state.ServiceComponent; import org.apache.ambari.server.state.ServiceInfo; import org.apache.ambari.server.state.StackId; -import org.apache.ambari.server.state.StackInfo; import org.apache.ambari.server.state.UpgradeContext; import org.apache.ambari.server.state.UpgradeContextFactory; import org.apache.ambari.server.state.UpgradeHelper; @@ -1312,25 +1308,6 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider // Apply additional parameters to the command that come from the stage. applyAdditionalParameters(wrapper, params); - // Because custom task may end up calling a script/function inside a - // service, it is necessary to set the - // service_package_folder and hooks_folder params. - AmbariMetaInfo ambariMetaInfo = s_metaProvider.get(); - StackId stackId = context.getEffectiveStackId(); - - StackInfo stackInfo = ambariMetaInfo.getStack(stackId.getStackName(), - stackId.getStackVersion()); - - if (wrapper.getTasks() != null && wrapper.getTasks().size() > 0 - && wrapper.getTasks().get(0).getService() != null) { - String serviceName = wrapper.getTasks().get(0).getService(); - ServiceInfo serviceInfo = ambariMetaInfo.getService(stackId.getStackName(), - stackId.getStackVersion(), serviceName); - - params.put(SERVICE_PACKAGE_FOLDER, serviceInfo.getServicePackageFolder()); - params.put(HOOKS_FOLDER, stackInfo.getStackHooksFolder()); - } - ActionExecutionContext actionContext = new ActionExecutionContext(cluster.getClusterName(), "ru_execute_tasks", Collections.singletonList(filter), params); http://git-wip-us.apache.org/repos/asf/ambari/blob/6bab5a53/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java index 22d7f2e..4600912 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,18 +17,15 @@ */ package org.apache.ambari.server.events.listeners.upgrade; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.EagerSingleton; -import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.events.HostComponentVersionAdvertisedEvent; import org.apache.ambari.server.events.publishers.VersionEventPublisher; import org.apache.ambari.server.orm.dao.RepositoryVersionDAO; +import org.apache.ambari.server.orm.entities.ClusterVersionEntity; +import org.apache.ambari.server.orm.entities.HostVersionEntity; import org.apache.ambari.server.orm.entities.RepositoryVersionEntity; import org.apache.ambari.server.state.Cluster; -import org.apache.ambari.server.state.ComponentInfo; import org.apache.ambari.server.state.ServiceComponent; import org.apache.ambari.server.state.ServiceComponentHost; import org.apache.ambari.server.state.State; @@ -37,19 +34,15 @@ import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; import com.google.inject.Inject; -import com.google.inject.Provider; import com.google.inject.Singleton; /** * The {@link StackVersionListener} class handles the propagation of versions * advertised by the {@link org.apache.ambari.server.state.ServiceComponentHost} * that bubble up to the - * {@link org.apache.ambari.server.orm.entities.HostVersionEntity} and - * eventually the - * {@link org.apache.ambari.server.orm.entities.ClusterVersionEntity} + * {@link org.apache.ambari.server.orm.entities.HostVersionEntity}. */ @Singleton @EagerSingleton @@ -60,18 +53,9 @@ public class StackVersionListener { private final static Logger LOG = LoggerFactory.getLogger(StackVersionListener.class); public static final String UNKNOWN_VERSION = State.UNKNOWN.toString(); - /** - * Used to prevent multiple threads from trying to create host alerts - * simultaneously. - */ - private Lock m_stackVersionLock = new ReentrantLock(); - @Inject private RepositoryVersionDAO repositoryVersionDAO; - @Inject - Provider<AmbariMetaInfo> ambariMetaInfo; - /** * Constructor. * @@ -83,7 +67,6 @@ public class StackVersionListener { } @Subscribe - @AllowConcurrentEvents public void onAmbariEvent(HostComponentVersionAdvertisedEvent event) { LOG.debug("Received event {}", event); @@ -96,8 +79,6 @@ public class StackVersionListener { return; } - m_stackVersionLock.lock(); - // if the cluster is upgrading, there's no need to update the repo version - // it better be right if (null != event.getRepositoryVersionId() && null == cluster.getUpgradeInProgress()) { @@ -116,70 +97,113 @@ public class StackVersionListener { // Update host component version value if needed try { - AmbariMetaInfo metaInfo = ambariMetaInfo.get(); - ComponentInfo componentInfo = metaInfo.getComponent(cluster.getDesiredStackVersion().getStackName(), - cluster.getDesiredStackVersion().getStackVersion(), sch.getServiceName(), sch.getServiceComponentName()); - ServiceComponent sc = cluster.getService(sch.getServiceName()).getServiceComponent(sch.getServiceComponentName()); - if (componentInfo.isVersionAdvertised() && StringUtils.isNotBlank(newVersion) - && !UNKNOWN_VERSION.equalsIgnoreCase(newVersion)) { - processComponentAdvertisedVersion(cluster, sch, newVersion, sc); - } else if(!sc.isVersionAdvertised() && StringUtils.isNotBlank(newVersion) - && !UNKNOWN_VERSION.equalsIgnoreCase(newVersion)) { - LOG.debug("ServiceComponent {} doesn't advertise version, " + - "however ServiceHostComponent {} on host {} advertised version as {}. Skipping version update", - sc.getName(), sch.getServiceComponentName(), sch.getHostName(), newVersion); - } else { - if (UNKNOWN_VERSION.equals(sc.getDesiredVersion())) { - processUnknownDesiredVersion(cluster, sc, sch, newVersion); - } else { - processComponentAdvertisedVersion(cluster, sch, newVersion, sc); + ServiceComponent sc = cluster.getService(sch.getServiceName()).getServiceComponent( + sch.getServiceComponentName()); + + // not advertising a version, do nothing + if (!sc.isVersionAdvertised()) { + // that's odd; a version came back - log it and still do nothing + if (!StringUtils.equalsIgnoreCase(UNKNOWN_VERSION, newVersion)) { + LOG.debug( + "ServiceComponent {} doesn't advertise version, however ServiceHostComponent {} on host {} advertised version as {}. Skipping version update", + sc.getName(), sch.getServiceComponentName(), sch.getHostName(), newVersion); } + return; } + + boolean desiredVersionIsCurrentlyUnknown = StringUtils.equalsIgnoreCase(UNKNOWN_VERSION, + sc.getDesiredVersion()); + + // proces the UNKNOWN version being received or currently desired + if (StringUtils.equalsIgnoreCase(UNKNOWN_VERSION, newVersion) + || desiredVersionIsCurrentlyUnknown) { + processUnknownDesiredVersion(cluster, sc, sch, newVersion); + return; + } + + processComponentAdvertisedVersion(cluster, sc, sch, newVersion); } catch (Exception e) { LOG.error( "Unable to propagate version for ServiceHostComponent on component: {}, host: {}. Error: {}", sch.getServiceComponentName(), sch.getHostName(), e.getMessage()); - } finally { - m_stackVersionLock.unlock(); } } + /** - * Update host component version - * or - * Bootstrap cluster/repo version when version is reported for the first time - * @param cluster target cluster - * @param sch target host component - * @param newVersion advertised version - * @param sc target service component + * Updates the version and {@link UpgradeState} for the specified + * {@link ServiceComponentHost} if necessary. If the version or the upgrade + * state changes, then this method will call + * {@link ServiceComponentHost#recalculateHostVersionState()} in order to + * ensure that the host version state is properly updated. + * <p/> + * + * + * @param cluster + * @param sc + * @param sch + * @param newVersion * @throws AmbariException */ - private void processComponentAdvertisedVersion(Cluster cluster, ServiceComponentHost sch, String newVersion, ServiceComponent sc) throws AmbariException { + private void processComponentAdvertisedVersion(Cluster cluster, ServiceComponent sc, + ServiceComponentHost sch, String newVersion) throws AmbariException { if (StringUtils.isBlank(newVersion)) { return; } + String previousVersion = sch.getVersion(); - if (previousVersion == null || UNKNOWN_VERSION.equalsIgnoreCase(previousVersion)) { - // value may be "UNKNOWN" when upgrading from older Ambari versions - // or if host component reports it's version for the first time - sch.setUpgradeState(UpgradeState.NONE); + String desiredVersion = sc.getDesiredVersion(); + UpgradeState upgradeState = sch.getUpgradeState(); + + // was this version expected + boolean newVersionMatchesDesired = StringUtils.equals(desiredVersion, newVersion); + + // was the prior version UNKNOWN + boolean previousVersionIsUnknown = StringUtils.equalsIgnoreCase(UNKNOWN_VERSION, previousVersion); + + boolean desiredVersionIsUnknown = StringUtils.equalsIgnoreCase(UNKNOWN_VERSION, desiredVersion); + + // is there an upgrade in progress for this component + boolean isUpgradeInProgressForThisComponent = null != cluster.getUpgradeInProgress() + && upgradeState != UpgradeState.NONE; + + // if the current version is an actual value (ie 2.2.0.0-1234 and not + // UNKNOWN), and the newly received version is unexpected, and we are not in + // an upgrade - then we really should not be changing the reported version + if (!previousVersionIsUnknown && !desiredVersionIsUnknown && !newVersionMatchesDesired + && !isUpgradeInProgressForThisComponent) { + LOG.warn( + "Received a reported version of {} for {} on {}. This was not expected since the desired version is {} and the cluster is not upgrading this component. The version will not be changed.", + newVersion, sc.getName(), sch.getHostName(), desiredVersion); + + return; + } + + // update the SCH to the new version reported + if (!StringUtils.equals(previousVersion, newVersion)) { sch.setVersion(newVersion); - bootstrapVersion(cluster, sch); - } else if (!StringUtils.equals(previousVersion, newVersion)) { - processComponentVersionChange(cluster, sc, sch, newVersion); } - } - /** - * Bootstrap cluster/repo version when version is reported for the first time - * @param cluster target cluster - * @param sch target host component - * @throws AmbariException - */ - private void bootstrapVersion(Cluster cluster, ServiceComponentHost sch) throws AmbariException { - RepositoryVersionEntity repoVersion = sch.recalculateHostVersionState(); - if (null != repoVersion) { - cluster.recalculateClusterVersionState(repoVersion); + if (previousVersion == null || previousVersionIsUnknown) { + // value may be "UNKNOWN" when upgrading from older Ambari versions + // or if host component reports it's version for the first time + sch.setUpgradeState(UpgradeState.NONE); + recalculateHostVersionAndClusterVersion(cluster, sch); + } else { + if (newVersionMatchesDesired) { + if (isUpgradeInProgressForThisComponent) { + sch.setStackVersion(cluster.getDesiredStackVersion()); + setUpgradeStateAndRecalculateHostVersions(cluster, sch, UpgradeState.COMPLETE); + } else { + // no upgrade in progress for this component, then this should always + // be NONE + setUpgradeStateAndRecalculateHostVersions(cluster, sch, UpgradeState.NONE); + } + } else { + // if the versions don't match for any reason, regardless of upgrade + // state, then VERSION_MISMATCH it + setUpgradeStateAndRecalculateHostVersions(cluster, sch, UpgradeState.VERSION_MISMATCH); + } } } @@ -197,40 +221,45 @@ public class StackVersionListener { sc.setDesiredVersion(newVersion); sch.setUpgradeState(UpgradeState.NONE); sch.setVersion(newVersion); - bootstrapVersion(cluster, sch); + + recalculateHostVersionAndClusterVersion(cluster, sch); } /** - * Focuses on cases when host component version really changed - * @param cluster target cluster - * @param sc target service component - * @param sch target host component - * @param newVersion advertised version + * @param sch + * @param upgradeState + * @throws AmbariException */ - private void processComponentVersionChange(Cluster cluster, ServiceComponent sc, - ServiceComponentHost sch, - String newVersion) { - String desiredVersion = sc.getDesiredVersion(); - UpgradeState upgradeState = sch.getUpgradeState(); - if (upgradeState == UpgradeState.IN_PROGRESS) { - // Component status update is received during upgrade process - if (desiredVersion.equals(newVersion)) { - sch.setUpgradeState(UpgradeState.COMPLETE); // Component upgrade confirmed - sch.setStackVersion(cluster.getDesiredStackVersion()); - } else { // Unexpected (wrong) version received - // Even during failed upgrade, we should not receive wrong version - // That's why mark as VERSION_MISMATCH - sch.setUpgradeState(UpgradeState.VERSION_MISMATCH); - } - } else if (upgradeState == UpgradeState.VERSION_MISMATCH && desiredVersion.equals(newVersion)) { - if (cluster.getUpgradeInProgress() != null) { - sch.setUpgradeState(UpgradeState.COMPLETE); - } else { - sch.setUpgradeState(UpgradeState.NONE); - } - } else { // No upgrade in progress, unexpected version change - sch.setUpgradeState(UpgradeState.VERSION_MISMATCH); + private void setUpgradeStateAndRecalculateHostVersions(Cluster cluster, ServiceComponentHost sch, + UpgradeState upgradeState) throws AmbariException { + + // don't need to recalculate anything here if the upgrade state is not changing + if (sch.getUpgradeState() == upgradeState) { + return; + } + + // if the upgrade state changes, then also recalculate host versions + sch.setUpgradeState(upgradeState); + + recalculateHostVersionAndClusterVersion(cluster, sch); + } + + /** + * Recalculates the {@link HostVersionEntity} for the host specified by the + * host component, taking into account all component states on that host. This + * will also trigger a {@link ClusterVersionEntity} recalculatation for the + * cluster version as well. + * + * @param cluster + * @param sch + * @throws AmbariException + */ + private void recalculateHostVersionAndClusterVersion(Cluster cluster, ServiceComponentHost sch) + throws AmbariException { + // trigger a re-calculation of the cluster state based on the SCH state + RepositoryVersionEntity repoVersion = sch.recalculateHostVersionState(); + if (null != repoVersion) { + cluster.recalculateClusterVersionState(repoVersion); } - sch.setVersion(newVersion); } -} +} \ No newline at end of file