http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbd826c/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/PrepareDisableKerberosServerAction.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/PrepareDisableKerberosServerAction.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/PrepareDisableKerberosServerAction.java index 5b65833..3087379 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/PrepareDisableKerberosServerAction.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/kerberos/PrepareDisableKerberosServerAction.java @@ -35,6 +35,7 @@ import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.ConfigHelper; import org.apache.ambari.server.state.PropertyInfo; import org.apache.ambari.server.state.SecurityState; +import org.apache.ambari.server.state.ServiceComponent; import org.apache.ambari.server.state.ServiceComponentHost; import org.apache.ambari.server.state.StackId; import org.apache.ambari.server.state.kerberos.KerberosDescriptor; @@ -177,7 +178,8 @@ public class PrepareDisableKerberosServerAction extends AbstractPrepareKerberosS String serviceName = sch.getServiceName(); if (!visitedServices.contains(serviceName)) { - StackId stackVersion = sch.getStackVersion(); + ServiceComponent serviceComponent = sch.getServiceComponent(); + StackId stackVersion = serviceComponent.getDesiredStackVersion(); visitedServices.add(serviceName);
http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbd826c/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/AbstractUpgradeServerAction.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/AbstractUpgradeServerAction.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/AbstractUpgradeServerAction.java index 5d73fac..4fc8271 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/AbstractUpgradeServerAction.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/AbstractUpgradeServerAction.java @@ -21,13 +21,18 @@ import java.util.Collections; import java.util.Set; import org.apache.ambari.server.controller.internal.UpgradeResourceProvider; +import org.apache.ambari.server.orm.entities.UpgradeEntity; import org.apache.ambari.server.serveraction.AbstractServerAction; +import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.UpgradeContext; +import org.apache.ambari.server.state.UpgradeContextFactory; +import org.apache.ambari.server.state.UpgradeHelper; import org.apache.ambari.server.state.stack.upgrade.Direction; +import org.apache.ambari.server.state.stack.upgrade.UpgradeScope; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.metrics2.sink.relocated.google.common.collect.Sets; +import com.google.common.collect.Sets; import com.google.inject.Inject;; /** @@ -60,15 +65,40 @@ public abstract class AbstractUpgradeServerAction extends AbstractServerAction { protected Clusters m_clusters; /** - * @return the set of supported services + * Used to move desired repo versions forward. */ - protected Set<String> getSupportedServices() { + @Inject + protected UpgradeHelper m_upgradeHelper; + + /** + * Used to create instances of {@link UpgradeContext} with injected + * dependencies. + */ + @Inject + private UpgradeContextFactory m_upgradeContextFactory; + + /** + * Gets an initialized {@link UpgradeContext} for the in-progress upgrade. + */ + protected UpgradeContext getUpgradeContext(Cluster cluster) { + UpgradeEntity upgrade = cluster.getUpgradeInProgress(); + UpgradeContext upgradeContext = m_upgradeContextFactory.create(cluster, upgrade); + + final UpgradeScope scope; + final Set<String> supportedServices; String services = getCommandParameterValue(SUPPORTED_SERVICES_KEY); if (StringUtils.isBlank(services)) { - return Collections.emptySet(); + scope = UpgradeScope.COMPLETE; + supportedServices = Collections.emptySet(); + } else { - return Sets.newHashSet(StringUtils.split(services, ',')); + scope = UpgradeScope.PARTIAL; + supportedServices = Sets.newHashSet(StringUtils.split(services, ',')); } - } + upgradeContext.setSupportedServices(supportedServices); + upgradeContext.setScope(scope); + + return upgradeContext; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbd826c/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ComponentVersionCheckAction.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ComponentVersionCheckAction.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ComponentVersionCheckAction.java index 52c0cf2..4a3bd9b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ComponentVersionCheckAction.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/ComponentVersionCheckAction.java @@ -29,7 +29,7 @@ import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.agent.CommandReport; import org.apache.ambari.server.state.Cluster; -import org.apache.ambari.server.state.StackId; +import org.apache.ambari.server.state.UpgradeContext; import org.apache.commons.lang.StringUtils; import com.google.gson.JsonArray; @@ -49,13 +49,12 @@ public class ComponentVersionCheckAction extends FinalizeUpgradeAction { Map<String, String> commandParams = getExecutionCommand().getCommandParams(); - String version = commandParams.get(VERSION_KEY); - StackId targetStackId = new StackId(commandParams.get(TARGET_STACK_KEY)); String clusterName = getExecutionCommand().getClusterName(); Cluster cluster = m_clusters.getCluster(clusterName); - List<InfoTuple> errors = checkHostComponentVersions(cluster, version, targetStackId); + UpgradeContext upgradeContext = getUpgradeContext(cluster); + List<InfoTuple> errors = getHostComponentsWhichDidNotUpgrade(upgradeContext); StringBuilder outSB = new StringBuilder(); StringBuilder errSB = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbd826c/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/FinalizeUpgradeAction.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/FinalizeUpgradeAction.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/FinalizeUpgradeAction.java index 32d6151..a4cc757 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/FinalizeUpgradeAction.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/FinalizeUpgradeAction.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; @@ -34,13 +33,10 @@ import org.apache.ambari.server.agent.CommandReport; import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.events.StackUpgradeFinishEvent; import org.apache.ambari.server.events.publishers.VersionEventPublisher; -import org.apache.ambari.server.orm.dao.ClusterVersionDAO; import org.apache.ambari.server.orm.dao.HostComponentStateDAO; import org.apache.ambari.server.orm.dao.HostVersionDAO; import org.apache.ambari.server.orm.dao.ServiceComponentDesiredStateDAO; import org.apache.ambari.server.orm.dao.StackDAO; -import org.apache.ambari.server.orm.dao.UpgradeDAO; -import org.apache.ambari.server.orm.entities.ClusterVersionEntity; import org.apache.ambari.server.orm.entities.HostComponentStateEntity; import org.apache.ambari.server.orm.entities.HostEntity; import org.apache.ambari.server.orm.entities.HostVersionEntity; @@ -56,7 +52,9 @@ import org.apache.ambari.server.state.Service; import org.apache.ambari.server.state.ServiceComponent; import org.apache.ambari.server.state.ServiceComponentHost; import org.apache.ambari.server.state.StackId; +import org.apache.ambari.server.state.UpgradeContext; import org.apache.ambari.server.state.UpgradeState; +import org.apache.ambari.server.state.stack.upgrade.Direction; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostSummary; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.text.StrBuilder; @@ -72,9 +70,6 @@ public class FinalizeUpgradeAction extends AbstractUpgradeServerAction { "For this reason, Ambari will not remove any configs. Please ensure that all database records are correct."; @Inject - private ClusterVersionDAO clusterVersionDAO; - - @Inject private HostVersionDAO hostVersionDAO; @Inject @@ -92,12 +87,6 @@ public class FinalizeUpgradeAction extends AbstractUpgradeServerAction { @Inject private ServiceComponentDesiredStateDAO serviceComponentDesiredStateDAO; - /** - * Gets {@link UpgradeEntity} instances. - */ - @Inject - private UpgradeDAO upgradeDAO; - @Inject private AmbariMetaInfo ambariMetaInfo; @@ -108,21 +97,15 @@ public class FinalizeUpgradeAction extends AbstractUpgradeServerAction { public CommandReport execute(ConcurrentMap<String, Object> requestSharedDataContext) throws AmbariException, InterruptedException { - Map<String, String> commandParams = getExecutionCommand().getCommandParams(); - - boolean isDowngrade = commandParams.containsKey(UPGRADE_DIRECTION_KEY) && - "downgrade".equals(commandParams.get(UPGRADE_DIRECTION_KEY).toLowerCase()); - - String version = commandParams.get(VERSION_KEY); - StackId originalStackId = new StackId(commandParams.get(ORIGINAL_STACK_KEY)); - StackId targetStackId = new StackId(commandParams.get(TARGET_STACK_KEY)); - String clusterName = getExecutionCommand().getClusterName(); + Cluster cluster = m_clusters.getCluster(clusterName); - if (isDowngrade) { - return finalizeDowngrade(clusterName, originalStackId, targetStackId, version); + UpgradeContext upgradeContext = getUpgradeContext(cluster); + + if (upgradeContext.getDirection() == Direction.UPGRADE) { + return finalizeUpgrade(upgradeContext); } else { - return finalizeUpgrade(clusterName, version, commandParams); + return finalizeDowngrade(upgradeContext); } } @@ -132,32 +115,36 @@ public class FinalizeUpgradeAction extends AbstractUpgradeServerAction { * @param version the target version of the upgrade * @return the command report */ - private CommandReport finalizeUpgrade(String clusterName, String version, - Map<String, String> commandParams) + private CommandReport finalizeUpgrade(UpgradeContext upgradeContext) throws AmbariException, InterruptedException { StringBuilder outSB = new StringBuilder(); StringBuilder errSB = new StringBuilder(); try { - outSB.append(MessageFormat.format("Begin finalizing the upgrade of cluster {0} to version {1}\n", clusterName, version)); + String message; + Set<String> servicesInUpgrade = upgradeContext.getSupportedServices(); + if (servicesInUpgrade.isEmpty()) { + message = MessageFormat.format("Finalizing the upgrade to {0} for all cluster services.", + upgradeContext.getVersion()); + } else { + message = MessageFormat.format( + "Finalizing the upgrade to {0} for the following services: {1}", + upgradeContext.getVersion(), StringUtils.join(servicesInUpgrade, ',')); + } + + outSB.append(message).append(System.lineSeparator()); - Cluster cluster = m_clusters.getCluster(clusterName); + Cluster cluster = upgradeContext.getCluster(); StackId clusterDesiredStackId = cluster.getDesiredStackVersion(); StackId clusterCurrentStackId = cluster.getCurrentStackVersion(); + String version = upgradeContext.getVersion(); + RepositoryVersionEntity repositoryVersion = upgradeContext.getTargetRepositoryVersion(); - ClusterVersionEntity upgradingClusterVersion = clusterVersionDAO.findByClusterAndStackAndVersion( - clusterName, clusterDesiredStackId, version); - - if (upgradingClusterVersion == null) { - throw new AmbariException(MessageFormat.format( - "Cluster stack version {0} not found", version)); - } - - // Validate that all of the hosts with a version in the cluster have the - // version being upgraded to, and it is in an allowed state. - List<HostVersionEntity> hostVersions = hostVersionDAO.findByClusterStackAndVersion( - clusterName, clusterDesiredStackId, version); + // for all hosts participating in this upgrade, validate their repo + // versions + List<HostVersionEntity> hostVersions = hostVersionDAO.findHostVersionByClusterAndRepository( + cluster.getClusterId(), repositoryVersion); // Will include hosts whose state is INSTALLED Set<HostVersionEntity> hostVersionsAllowed = new HashSet<>(); @@ -213,25 +200,26 @@ public class FinalizeUpgradeAction extends AbstractUpgradeServerAction { // throw an exception if there are hosts which are not not fully upgraded if (hostsWithoutCorrectVersionState.size() > 0) { - String message = String.format("The following %d host(s) have not been upgraded to version %s. " + - "Please install and upgrade the Stack Version on those hosts and try again.\nHosts: %s\n", + message = String.format("The following %d host(s) have not been upgraded to version %s. " + + "Please install and upgrade the Stack Version on those hosts and try again.\nHosts: %s", hostsWithoutCorrectVersionState.size(), version, StringUtils.join(hostsWithoutCorrectVersionState, ", ")); outSB.append(message); + outSB.append(System.lineSeparator()); throw new AmbariException(message); } // iterate through all host components and make sure that they are on the // correct version; if they are not, then this will throw an exception - List<InfoTuple> errors = checkHostComponentVersions(cluster, version, clusterDesiredStackId); + List<InfoTuple> errors = getHostComponentsWhichDidNotUpgrade(upgradeContext); if (! errors.isEmpty()) { StrBuilder messageBuff = new StrBuilder( String.format( "The following %d host component(s) " + "have not been upgraded to version %s. Please install and upgrade " - + "the Stack Version on those hosts and try again.\nHost components:\n", - errors.size(), version)); + + "the Stack Version on those hosts and try again.\nHost components:", + errors.size(), version)).append(System.lineSeparator()); for (InfoTuple error : errors) { messageBuff.append(String.format("%s on host %s\n", error.componentName, error.hostName)); @@ -240,29 +228,9 @@ public class FinalizeUpgradeAction extends AbstractUpgradeServerAction { throw new AmbariException(messageBuff.toString()); } - - // we're guaranteed to be ready transition to upgraded now; ensure that - // the transition will be allowed if the cluster state is not upgraded - upgradingClusterVersion = clusterVersionDAO.findByClusterAndStackAndVersion(clusterName, - clusterDesiredStackId, version); - - if (RepositoryVersionState.INSTALLING == upgradingClusterVersion.getState()) { - cluster.transitionClusterVersion(clusterDesiredStackId, version, RepositoryVersionState.INSTALLED); - - upgradingClusterVersion = clusterVersionDAO.findByClusterAndStackAndVersion( - clusterName, clusterDesiredStackId, version); - } - - // we cannot finalize since the cluster was not ready to move into the - // upgraded state - if (RepositoryVersionState.INSTALLED != upgradingClusterVersion.getState()) { - throw new AmbariException(String.format("The cluster stack version state %s is not allowed to transition directly into %s", - upgradingClusterVersion.getState(), RepositoryVersionState.CURRENT.toString())); - } - outSB.append( - String.format("Finalizing the upgraded state of host components in %d host(s).\n", - hostVersionsAllowed.size())); + String.format("Finalizing the upgrade state of %d host(s).", + hostVersionsAllowed.size())).append(System.lineSeparator()); // Reset the upgrade state for (HostVersionEntity hostVersion : hostVersionsAllowed) { @@ -275,37 +243,29 @@ public class FinalizeUpgradeAction extends AbstractUpgradeServerAction { // Impacts all hosts that have a version outSB.append( - String.format("Finalizing the version for %d host(s).\n", hostVersionsAllowed.size())); - cluster.mapHostVersions(hostsToUpdate, upgradingClusterVersion, RepositoryVersionState.CURRENT); + String.format("Finalizing the version for %d host(s).", + hostVersionsAllowed.size())).append(System.lineSeparator()); + versionEventPublisher.publish(new StackUpgradeFinishEvent(cluster)); - // Reset upgrade state - cluster.setUpgradeEntity(null); // transitioning the cluster into CURRENT will update the current/desired // stack values - outSB.append(String.format("Finalizing the version for cluster %s.\n", clusterName)); + outSB.append( + String.format("Finalizing the version for cluster %s.", cluster.getClusterName())).append( + System.lineSeparator()); + cluster.transitionClusterVersion(clusterDesiredStackId, version, RepositoryVersionState.CURRENT); - if (commandParams.containsKey(REQUEST_ID)) { - String requestId = commandParams.get(REQUEST_ID); - UpgradeEntity upgradeEntity = upgradeDAO.findUpgradeByRequestId(Long.valueOf(requestId)); - - if (null != upgradeEntity) { - outSB.append("Creating upgrade history.\n"); - writeComponentHistory(cluster, upgradeEntity, clusterCurrentStackId, - clusterDesiredStackId); - } else { - String warning = String.format( - "Unable to create upgrade history because no upgrade could be found for request with ID %s\n", - requestId); + outSB.append("Creating upgrade history...").append(System.lineSeparator()); + writeComponentHistory(upgradeContext); - outSB.append(warning); - } - } + // Reset upgrade state + cluster.setUpgradeEntity(null); - outSB.append("Upgrade was successful!\n"); + message = String.format("The upgrade to %s has completed.", upgradeContext.getVersion()); + outSB.append(message).append(System.lineSeparator()); return createCommandReport(0, HostRoleStatus.COMPLETED, "{}", outSB.toString(), errSB.toString()); } catch (Exception e) { errSB.append(e.getMessage()); @@ -316,107 +276,64 @@ public class FinalizeUpgradeAction extends AbstractUpgradeServerAction { /** * Execution path for downgrade. * - * @param clusterName - * the name of the cluster the downgrade is for - * @paran originalStackId the stack ID of the cluster before the upgrade. - * @paran targetStackId the stack ID that was desired for this upgrade. - * @param version - * the target version of the downgrade + * @param upgradeContext + * the upgrade context (not {@code null}). * @return the command report */ - private CommandReport finalizeDowngrade(String clusterName, - StackId originalStackId, StackId targetStackId, String version) + private CommandReport finalizeDowngrade(UpgradeContext upgradeContext) throws AmbariException, InterruptedException { - StringBuilder out = new StringBuilder(); - StringBuilder err = new StringBuilder(); + StringBuilder outSB = new StringBuilder(); + StringBuilder errSB = new StringBuilder(); try { - Cluster cluster = m_clusters.getCluster(clusterName); + Cluster cluster = upgradeContext.getCluster(); StackId currentClusterStackId = cluster.getCurrentStackVersion(); + RepositoryVersionEntity repositoryVersion = upgradeContext.getTargetRepositoryVersion(); - // Safety check that the cluster's stack (from clusterstate's current_stack_id) is equivalent to the - // cluster's CURRENT repo version's stack. This is to avoid deleting configs from the target stack if the customer - // ended up modifying their database manually after a stack upgrade and forgot to call "Save DB State". - ClusterVersionEntity currentClusterVersion = cluster.getCurrentClusterVersion(); - RepositoryVersionEntity currentRepoVersion = currentClusterVersion.getRepositoryVersion(); - StackId currentRepoStackId = currentRepoVersion.getStackId(); - if (!currentRepoStackId.equals(originalStackId)) { - String msg = String.format("The stack of Cluster %s's CURRENT repo version is %s, yet the original stack id from " + - "the Stack Upgrade has a different value of %s. %s", - clusterName, currentRepoStackId.getStackId(), originalStackId.getStackId(), PREVIOUS_UPGRADE_NOT_COMPLETED_MSG); - out.append(msg); - err.append(msg); - throw new AmbariException("The source target stack doesn't match the cluster's CURRENT repo version's stack."); - } - - // This was a cross-stack upgrade, meaning that configurations were created that now need to be removed. - if (!originalStackId.equals(targetStackId)) { - out.append(String.format("Will remove configs since the original stack %s differs from the target stack %s " + - "that Ambari just downgraded from.", originalStackId.getStackId(), targetStackId.getStackId())); - cluster.removeConfigurations(targetStackId); - } - - // !!! find and make sure the cluster_version EXCEPT current are set back - out.append(String.format("Searching for current version for %s\n", - clusterName)); - - ClusterVersionEntity clusterVersion = clusterVersionDAO.findByClusterAndStateCurrent(clusterName); - if (null == clusterVersion) { - throw new AmbariException("Could not find current cluster version"); + String message; + Set<String> servicesInUpgrade = upgradeContext.getSupportedServices(); + if (servicesInUpgrade.isEmpty()) { + message = MessageFormat.format("Finalizing the downgrade to {0} for all cluster services.", + upgradeContext.getVersion()); + } else { + message = MessageFormat.format( + "Finalizing the downgrade to {0} for the following services: {1}", + upgradeContext.getVersion(), StringUtils.join(servicesInUpgrade, ',')); } - out.append(String.format("Comparing downgrade version %s to current cluster version %s\n", - version, - clusterVersion.getRepositoryVersion().getVersion())); + outSB.append(message).append(System.lineSeparator()); + outSB.append(message).append(System.lineSeparator()); - if (!version.equals(clusterVersion.getRepositoryVersion().getVersion())) { - throw new AmbariException( - String.format("Downgrade version %s is not the current cluster version of %s", - version, clusterVersion.getRepositoryVersion().getVersion())); - } else { - out.append(String.format("Downgrade version is the same as current. Searching " + - "for cluster versions that do not match %s\n", version)); - } + // iterate through all host components and make sure that they are on the + // correct version; if they are not, then this will throw an exception + List<InfoTuple> errors = getHostComponentsWhichDidNotUpgrade(upgradeContext); + if (!errors.isEmpty()) { + StrBuilder messageBuff = new StrBuilder(String.format( + "The following %d host component(s) " + "have not been downgraded to version %s\n", + errors.size(), upgradeContext.getVersion())).append(System.lineSeparator()); - Set<String> badVersions = new HashSet<>(); - - // update the cluster version - for (ClusterVersionEntity cve : clusterVersionDAO.findByCluster(clusterName)) { - switch (cve.getState()) { - case INSTALL_FAILED: - case INSTALLED: - case INSTALLING: { - badVersions.add(cve.getRepositoryVersion().getVersion()); - cve.setState(RepositoryVersionState.INSTALLED); - clusterVersionDAO.merge(cve); - break; - } - default: - break; + for (InfoTuple error : errors) { + messageBuff.append(String.format("%s on host %s", error.componentName, error.hostName)); + messageBuff.append(System.lineSeparator()); } - } - out.append(String.format("Found %d other version(s) not matching downgrade: %s\n", - badVersions.size(), StringUtils.join(badVersions, ", "))); + throw new AmbariException(messageBuff.toString()); + } - Set<String> badHosts = new HashSet<>(); - for (String badVersion : badVersions) { - List<HostVersionEntity> hostVersions = hostVersionDAO.findByClusterStackAndVersion( - clusterName, targetStackId, badVersion); + // find host versions + List<HostVersionEntity> hostVersions = hostVersionDAO.findHostVersionByClusterAndRepository( + cluster.getClusterId(), repositoryVersion); - for (HostVersionEntity hostVersion : hostVersions) { - badHosts.add(hostVersion.getHostName()); + for( HostVersionEntity hostVersion : hostVersions ){ + if( hostVersion.getState() != RepositoryVersionState.INSTALLED ){ hostVersion.setState(RepositoryVersionState.INSTALLED); hostVersionDAO.merge(hostVersion); } - } - out.append(String.format("Found %d hosts not matching downgrade version: %s\n", - badHosts.size(), version)); + List<HostComponentStateEntity> hostComponentStates = hostComponentStateDAO.findByHost( + hostVersion.getHostName()); - for (String badHost : badHosts) { - List<HostComponentStateEntity> hostComponentStates = hostComponentStateDAO.findByHost(badHost); for (HostComponentStateEntity hostComponentState : hostComponentStates) { hostComponentState.setUpgradeState(UpgradeState.NONE); hostComponentStateDAO.merge(hostComponentState); @@ -427,37 +344,38 @@ public class FinalizeUpgradeAction extends AbstractUpgradeServerAction { // original value cluster.setDesiredStackVersion(currentClusterStackId); versionEventPublisher.publish(new StackUpgradeFinishEvent(cluster)); + // Reset upgrade state cluster.setUpgradeEntity(null); - return createCommandReport(0, HostRoleStatus.COMPLETED, "{}", - out.toString(), err.toString()); - + return createCommandReport(0, HostRoleStatus.COMPLETED, "{}", outSB.toString(), errSB.toString()); } catch (Exception e) { StringWriter sw = new StringWriter(); e.printStackTrace(new PrintWriter(sw)); - err.append(sw.toString()); + errSB.append(sw.toString()); - return createCommandReport(-1, HostRoleStatus.FAILED, "{}", - out.toString(), err.toString()); + return createCommandReport(-1, HostRoleStatus.FAILED, "{}", outSB.toString(), errSB.toString()); } } /** - * Confirms that all host components that are able to provide hdp version, - * have been upgraded to the target version. - * @param cluster the cluster the upgrade is for - * @param desiredVersion the target version of the upgrade - * @param targetStackId the target stack id for meta-info lookup - * @return the list of {@link InfoTuple} objects of host components in error + * Gets any host components which have not been propertly upgraded. + * + * @param upgradeContext + * the upgrade context (not {@code null}). + * @return a list of {@link InfoTuple} representing components which should + * have been upgraded but did not. */ - protected List<InfoTuple> checkHostComponentVersions(Cluster cluster, String desiredVersion, StackId targetStackId) + protected List<InfoTuple> getHostComponentsWhichDidNotUpgrade(UpgradeContext upgradeContext) throws AmbariException { ArrayList<InfoTuple> errors = new ArrayList<>(); - Set<String> supportedServices = getSupportedServices(); + Cluster cluster = upgradeContext.getCluster(); + Set<String> supportedServices = upgradeContext.getSupportedServices(); + RepositoryVersionEntity repositoryVersionEntity = upgradeContext.getTargetRepositoryVersion(); + StackId targetStackId = repositoryVersionEntity.getStackId(); for (Service service : cluster.getServices().values()) { @@ -471,17 +389,12 @@ public class FinalizeUpgradeAction extends AbstractUpgradeServerAction { ComponentInfo componentInfo = ambariMetaInfo.getComponent(targetStackId.getStackName(), targetStackId.getStackVersion(), service.getName(), serviceComponent.getName()); - if (!componentInfo.isVersionAdvertised()) { - StackId desired = serviceComponentHost.getDesiredStackVersion(); - StackId actual = serviceComponentHost.getStackVersion(); - if (!desired.equals(actual)) { - serviceComponentHost.setStackVersion(desired); + if (componentInfo.isVersionAdvertised()) { + if (!StringUtils.equals(upgradeContext.getVersion(), + serviceComponentHost.getVersion())) { + errors.add(new InfoTuple(service.getName(), serviceComponent.getName(), + serviceComponentHost.getHostName(), serviceComponentHost.getVersion())); } - } else if (componentInfo.isVersionAdvertised() - && !serviceComponentHost.getVersion().equals(desiredVersion)) { - errors.add(new InfoTuple( - service.getName(), serviceComponent.getName(), - serviceComponentHost.getHostName(), serviceComponentHost.getVersion())); } } } @@ -490,15 +403,36 @@ public class FinalizeUpgradeAction extends AbstractUpgradeServerAction { return errors; } - private void writeComponentHistory(Cluster cluster, UpgradeEntity upgradeEntity, - StackId fromStackId, StackId toStackId) { + /** + * Writes the upgrade history for all components which participated in the + * upgrade. + * + * @param upgradeContext the upgrade context (not {@code null}). + */ + private void writeComponentHistory(UpgradeContext upgradeContext) throws AmbariException { + Cluster cluster = upgradeContext.getCluster(); + UpgradeEntity upgradeEntity = cluster.getUpgradeInProgress(); + Collection<Service> services = cluster.getServices().values(); + RepositoryVersionEntity repositoryVersion = upgradeContext.getTargetRepositoryVersion(); + StackId sourcceStackId = upgradeContext.getOriginalStackId(); + StackId targetStackId = repositoryVersion.getStackId(); + + StackEntity fromStack = stackDAO.find(sourcceStackId.getStackName(), sourcceStackId.getStackVersion()); + StackEntity toStack = stackDAO.find(targetStackId.getStackName(), targetStackId.getStackVersion()); + - StackEntity fromStack = stackDAO.find(fromStackId.getStackName(), fromStackId.getStackVersion()); - StackEntity toStack = stackDAO.find(toStackId.getStackName(), toStackId.getStackVersion()); + if (!upgradeContext.getSupportedServices().isEmpty()) { + services = new ArrayList<>(); + + Set<String> serviceNames = upgradeContext.getSupportedServices(); + for (String serviceName : serviceNames) { + services.add(cluster.getService(serviceName)); + } + } // for every service component, if it was included in the upgrade then // create a historical entry - for (Service service : cluster.getServices().values()) { + for (Service service : services) { for (ServiceComponent serviceComponent : service.getServiceComponents().values()) { if (serviceComponent.isVersionAdvertised()) { // create the historical entry http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbd826c/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/UpdateDesiredStackAction.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/UpdateDesiredStackAction.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/UpdateDesiredStackAction.java index 7bcb9d0..22f2e73 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/UpdateDesiredStackAction.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/upgrades/UpdateDesiredStackAction.java @@ -21,7 +21,9 @@ import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.VERSION; import java.io.PrintWriter; import java.io.StringWriter; +import java.text.MessageFormat; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import org.apache.ambari.server.AmbariException; @@ -31,14 +33,15 @@ import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.AmbariServer; import org.apache.ambari.server.controller.internal.UpgradeResourceProvider; -import org.apache.ambari.server.serveraction.AbstractServerAction; +import org.apache.ambari.server.orm.entities.UpgradeEntity; import org.apache.ambari.server.serveraction.ServerAction; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.StackId; -import org.apache.ambari.server.state.StackInfo; +import org.apache.ambari.server.state.UpgradeContext; import org.apache.ambari.server.state.stack.UpgradePack; import org.apache.ambari.server.state.stack.upgrade.Direction; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +53,7 @@ import com.google.inject.Inject; * actually changed half-way through calculating the Actions, and this serves to update the database to make it * evident to the user at which point it changed. */ -public class UpdateDesiredStackAction extends AbstractServerAction { +public class UpdateDesiredStackAction extends AbstractUpgradeServerAction { /** * Logger. @@ -91,22 +94,27 @@ public class UpdateDesiredStackAction extends AbstractServerAction { @Inject private Configuration m_configuration; + /** + * {@inheritDoc} + */ @Override public CommandReport execute(ConcurrentMap<String, Object> requestSharedDataContext) throws AmbariException, InterruptedException { + Map<String, String> commandParams = getExecutionCommand().getCommandParams(); + String clusterName = getExecutionCommand().getClusterName(); + Cluster cluster = clusters.getCluster(clusterName); + UpgradeEntity upgrade = cluster.getUpgradeInProgress(); + + UpgradeContext upgradeContext = getUpgradeContext(cluster); StackId originalStackId = new StackId(commandParams.get(COMMAND_PARAM_ORIGINAL_STACK)); StackId targetStackId = new StackId(commandParams.get(COMMAND_PARAM_TARGET_STACK)); - Direction direction = Direction.UPGRADE; - if(commandParams.containsKey(COMMAND_PARAM_DIRECTION) - && "downgrade".equals(commandParams.get(COMMAND_PARAM_DIRECTION).toLowerCase())) { - direction = Direction.DOWNGRADE; - } - String version = commandParams.get(COMMAND_PARAM_VERSION); - String upgradePackName = commandParams.get(COMMAND_PARAM_UPGRADE_PACK); - String clusterName = getExecutionCommand().getClusterName(); - UpgradePack upgradePack = ambariMetaInfo.getUpgradePacks(originalStackId.getStackName(), originalStackId.getStackVersion()).get(upgradePackName); + + String upgradePackName = upgrade.getUpgradePackage(); + + UpgradePack upgradePack = ambariMetaInfo.getUpgradePacks(originalStackId.getStackName(), + originalStackId.getStackVersion()).get(upgradePackName); Map<String, String> roleParams = getExecutionCommand().getRoleParams(); @@ -120,74 +128,56 @@ public class UpdateDesiredStackAction extends AbstractServerAction { } // invalidate any cached effective ID - Cluster cluster = clusters.getCluster(clusterName); cluster.invalidateUpgradeEffectiveVersion(); - return updateDesiredStack(cluster, originalStackId, targetStackId, version, direction, + return updateDesiredRepositoryVersion(cluster, originalStackId, targetStackId, upgradeContext, upgradePack, userName); } /** - * Set the cluster's Desired Stack Id during an upgrade. + * Sets the desired repository version for services participating in the + * upgrade. * - * @param cluster the cluster - * @param originalStackId the stack Id of the cluster before the upgrade. - * @param targetStackId the stack Id that was desired for this upgrade. - * @param direction direction, either upgrade or downgrade - * @param upgradePack Upgrade Pack to use - * @param userName username performing the action + * @param cluster + * the cluster + * @param originalStackId + * the stack Id of the cluster before the upgrade. + * @param targetStackId + * the stack Id that was desired for this upgrade. + * @param direction + * direction, either upgrade or downgrade + * @param upgradePack + * Upgrade Pack to use + * @param userName + * username performing the action * @return the command report to return */ - private CommandReport updateDesiredStack( + private CommandReport updateDesiredRepositoryVersion( Cluster cluster, StackId originalStackId, StackId targetStackId, - String version, Direction direction, UpgradePack upgradePack, String userName) + UpgradeContext upgradeContext, UpgradePack upgradePack, String userName) throws AmbariException, InterruptedException { - String clusterName = cluster.getClusterName(); StringBuilder out = new StringBuilder(); StringBuilder err = new StringBuilder(); try { - StackId currentClusterStackId = cluster.getCurrentStackVersion(); - out.append(String.format("Params: %s %s %s %s %s %s\n", - clusterName, originalStackId.getStackId(), targetStackId.getStackId(), version, direction.getText(false), upgradePack.getName())); - - out.append(String.format("Checking if can update the Desired Stack Id to %s. The cluster's current Stack Id is %s\n", targetStackId.getStackId(), currentClusterStackId.getStackId())); - - // Ensure that the target stack id exist - StackInfo desiredClusterStackInfo = ambariMetaInfo.getStack(targetStackId.getStackName(), targetStackId.getStackVersion()); - if (null == desiredClusterStackInfo) { - String message = String.format("Parameter %s has an invalid value: %s. That Stack Id does not exist.\n", - COMMAND_PARAM_TARGET_STACK, targetStackId.getStackId()); - err.append(message); - out.append(message); - return createCommandReport(-1, HostRoleStatus.FAILED, "{}", out.toString(), err.toString()); - } - - // Ensure that the current Stack Id coincides with the parameter that the user passed in. - if (!currentClusterStackId.equals(originalStackId)) { - String message = String.format("Parameter %s has invalid value: %s. " + - "The cluster is currently on stack %s, " + currentClusterStackId.getStackId() + - ", yet the parameter to this function indicates a different value.\n", COMMAND_PARAM_ORIGINAL_STACK, originalStackId.getStackId(), currentClusterStackId.getStackId()); - err.append(message); - out.append(message); - return createCommandReport(-1, HostRoleStatus.FAILED, "{}", out.toString(), err.toString()); - } - - // Check for a no-op - if (currentClusterStackId.equals(targetStackId)) { - String message = String.format("Success! The cluster's Desired Stack Id was already set to %s\n", targetStackId.getStackId()); - out.append(message); - return createCommandReport(0, HostRoleStatus.COMPLETED, "{}", out.toString(), err.toString()); + UpgradeResourceProvider upgradeResourceProvider = new UpgradeResourceProvider(AmbariServer.getController()); + upgradeResourceProvider.applyStackAndProcessConfigurations(upgradeContext); + m_upgradeHelper.putComponentsToUpgradingState(upgradeContext); + + final String message; + Set<String> servicesInUpgrade = upgradeContext.getSupportedServices(); + if (servicesInUpgrade.isEmpty()) { + message = MessageFormat.format( + "Updating the desired repository version to {0} for all cluster services.", + upgradeContext.getVersion()); + } else { + message = MessageFormat.format( + "Updating the desired repository version to {0} for the following services: {1}", + upgradeContext.getVersion(), StringUtils.join(servicesInUpgrade, ',')); } - // Create Create new configurations that are a merge between the current stack and the desired stack - // Also updates the desired stack version. - UpgradeResourceProvider upgradeResourceProvider = new UpgradeResourceProvider(AmbariServer.getController()); - upgradeResourceProvider.applyStackAndProcessConfigurations(targetStackId.getStackName(), cluster, version, direction, upgradePack, userName); - String message = String.format("Success! Set cluster's %s Desired Stack Id to %s.\n", clusterName, targetStackId.getStackId()); out.append(message); - return createCommandReport(0, HostRoleStatus.COMPLETED, "{}", out.toString(), err.toString()); } catch (Exception e) { StringWriter sw = new StringWriter(); http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbd826c/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java index 1ef204d..88c5a59 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java @@ -196,15 +196,6 @@ public interface Cluster { void setDesiredStackVersion(StackId stackVersion) throws AmbariException; /** - * Sets the desired stack version, optionally setting all owned services, - * components, and host components - * @param stackId the stack id - * @param cascade {@code true} to cascade the desired version - */ - void setDesiredStackVersion(StackId stackId, boolean cascade) throws AmbariException; - - - /** * Get current stack version * @return */ @@ -217,17 +208,6 @@ public interface Cluster { void setCurrentStackVersion(StackId stackVersion) throws AmbariException; /** - * Create host versions for all of the hosts that don't already have the stack version. - * @param hostNames Collection of host names - * @param currentClusterVersion Entity that contains the cluster's current stack (with its name and version) - * @param desiredState Desired state must be {@link RepositoryVersionState#CURRENT} or {@link RepositoryVersionState#UPGRADING} - * @throws AmbariException - */ - void mapHostVersions(Set<String> hostNames, - ClusterVersionEntity currentClusterVersion, - RepositoryVersionState desiredState) throws AmbariException; - - /** * Creates or updates host versions for all of the hosts within a cluster * based on state of cluster stack version. This is used to transition all * hosts into the correct state (which may not be @@ -535,11 +515,17 @@ public interface Cluster { /** * Add service to the cluster + * * @param serviceName + * the name of the service to add (not {@code null}). + * @param repositoryVersion + * the repository from which the service should be installed (not + * {@code null}). * @return * @throws AmbariException */ - Service addService(String serviceName) throws AmbariException; + Service addService(String serviceName, RepositoryVersionEntity repositoryVersion) + throws AmbariException; /** * Fetch desired configs for list of hosts in cluster http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbd826c/ambari-server/src/main/java/org/apache/ambari/server/state/Service.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Service.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Service.java index 5964e33..7849463 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/Service.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Service.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.controller.ServiceResponse; +import org.apache.ambari.server.orm.entities.RepositoryVersionEntity; public interface Service { @@ -66,8 +67,6 @@ public interface Service { StackId getDesiredStackVersion(); - void setDesiredStackVersion(StackId stackVersion); - ServiceResponse convertToResponse(); void debugDump(StringBuilder sb); @@ -139,6 +138,16 @@ public interface Service { */ void setCredentialStoreEnabled(boolean credentialStoreEnabled); + /** + * @return + */ + RepositoryVersionEntity getDesiredRepositoryVersion(); + + /** + * @param desiredRepositoryVersion + */ + void setDesiredRepositoryVersion(RepositoryVersionEntity desiredRepositoryVersion); + enum Type { HDFS, GLUSTERFS, http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbd826c/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponent.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponent.java index b5b6821..80b4470 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponent.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.controller.ServiceComponentResponse; +import org.apache.ambari.server.orm.entities.RepositoryVersionEntity; public interface ServiceComponent { @@ -51,13 +52,18 @@ public interface ServiceComponent { void setDesiredState(State state); - StackId getDesiredStackVersion(); + /** + * Gets the desired repository for this service component. + * + * @return + */ + RepositoryVersionEntity getDesiredRepositoryVersion(); - void setDesiredStackVersion(StackId stackVersion); + StackId getDesiredStackVersion(); String getDesiredVersion(); - void setDesiredVersion(String version); + void setDesiredRepositoryVersion(RepositoryVersionEntity repositoryVersionEntity); /** * Refresh Component info due to current stack http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbd826c/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHost.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHost.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHost.java index 104e456..b7f8d29 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHost.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHost.java @@ -91,10 +91,6 @@ public interface ServiceComponentHost { void setDesiredState(State state); - StackId getDesiredStackVersion(); - - void setDesiredStackVersion(StackId stackVersion); - State getState(); void setState(State state); @@ -167,10 +163,6 @@ public interface ServiceComponentHost { */ UpgradeState getUpgradeState(); - StackId getStackVersion(); - - void setStackVersion(StackId stackVersion); - HostComponentAdminState getComponentAdminState(); void setComponentAdminState(HostComponentAdminState attribute); @@ -251,4 +243,11 @@ public interface ServiceComponentHost { HostComponentDesiredStateEntity getDesiredStateEntity(); + /** + * Gets the service component. + * + * @return the service component (never {@code null}). + */ + ServiceComponent getServiceComponent(); + } http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbd826c/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java index 4cfb250..1f9dc5b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java @@ -120,17 +120,13 @@ public class ServiceComponentImpl implements ServiceComponent { this.stackDAO = stackDAO; this.eventPublisher = eventPublisher; - StackId stackId = service.getDesiredStackVersion(); - StackEntity stackEntity = stackDAO.find(stackId.getStackName(), stackId.getStackVersion()); - ServiceComponentDesiredStateEntity desiredStateEntity = new ServiceComponentDesiredStateEntity(); desiredStateEntity.setComponentName(componentName); desiredStateEntity.setDesiredState(State.INIT); - desiredStateEntity.setDesiredVersion(State.UNKNOWN.toString()); desiredStateEntity.setServiceName(service.getName()); desiredStateEntity.setClusterId(service.getClusterId()); desiredStateEntity.setRecoveryEnabled(false); - desiredStateEntity.setDesiredStack(stackEntity); + desiredStateEntity.setDesiredRepositoryVersion(service.getDesiredRepositoryVersion()); updateComponentInfo(); @@ -394,22 +390,16 @@ public class ServiceComponentImpl implements ServiceComponent { } } + /** + * {@inheritDoc} + */ @Override - public void setDesiredStackVersion(StackId stack) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting DesiredStackVersion of Service" + ", clusterName=" - + service.getCluster().getClusterName() + ", clusterId=" - + service.getCluster().getClusterId() + ", serviceName=" + service.getName() - + ", serviceComponentName=" + getName() + ", oldDesiredStackVersion=" - + getDesiredStackVersion() + ", newDesiredStackVersion=" + stack); - } - + public void setDesiredRepositoryVersion(RepositoryVersionEntity repositoryVersionEntity) { ServiceComponentDesiredStateEntity desiredStateEntity = serviceComponentDesiredStateDAO.findById( desiredStateEntityId); if (desiredStateEntity != null) { - StackEntity stackEntity = stackDAO.find(stack.getStackName(), stack.getStackVersion()); - desiredStateEntity.setDesiredStack(stackEntity); + desiredStateEntity.setDesiredRepositoryVersion(repositoryVersionEntity); desiredStateEntity = serviceComponentDesiredStateDAO.merge(desiredStateEntity); } else { LOG.warn("Setting a member on an entity object that may have been " @@ -417,26 +407,23 @@ public class ServiceComponentImpl implements ServiceComponent { } } + /** + * {@inheritDoc} + */ @Override - public String getDesiredVersion() { + public RepositoryVersionEntity getDesiredRepositoryVersion() { ServiceComponentDesiredStateEntity desiredStateEntity = serviceComponentDesiredStateDAO.findById( desiredStateEntityId); - return desiredStateEntity.getDesiredVersion(); + return desiredStateEntity.getDesiredRepositoryVersion(); } @Override - public void setDesiredVersion(String version) { + public String getDesiredVersion() { ServiceComponentDesiredStateEntity desiredStateEntity = serviceComponentDesiredStateDAO.findById( desiredStateEntityId); - if (desiredStateEntity != null) { - desiredStateEntity.setDesiredVersion(version); - desiredStateEntity = serviceComponentDesiredStateDAO.merge(desiredStateEntity); - } else { - LOG.warn("Setting a member on an entity object that may have been " + - "previously deleted, serviceName = " + (service != null ? service.getName() : "")); - } + return desiredStateEntity.getDesiredVersion(); } @Override @@ -693,6 +680,7 @@ public class ServiceComponentImpl implements ServiceComponent { if (MapUtils.isNotEmpty(map)) { String desiredVersion = component.getDesiredVersion(); + RepositoryVersionEntity desiredRepositoryVersion = service.getDesiredRepositoryVersion(); List<HostComponentStateEntity> hostComponents = hostComponentDAO.findByServiceAndComponentAndNotVersion( component.getServiceName(), component.getComponentName(), reportedVersion); @@ -705,7 +693,7 @@ public class ServiceComponentImpl implements ServiceComponent { if (StackVersionListener.UNKNOWN_VERSION.equals(desiredVersion)) { if (CollectionUtils.isEmpty(hostComponents)) { // all host components are the same version as reported - component.setDesiredVersion(reportedVersion); + component.setDesiredRepositoryVersion(desiredRepositoryVersion); component.setRepositoryState(RepositoryVersionState.CURRENT); } else { // desired is UNKNOWN and there's a mix of versions in the host components http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbd826c/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceFactory.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceFactory.java index a3a041b..1e1795e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceFactory.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceFactory.java @@ -19,10 +19,33 @@ package org.apache.ambari.server.state; import org.apache.ambari.server.orm.entities.ClusterServiceEntity; +import org.apache.ambari.server.orm.entities.RepositoryVersionEntity; public interface ServiceFactory { - Service createNew(Cluster cluster, String serviceName); + /** + * Creates a new service in memory and then persists it to the database. + * + * @param cluster + * the cluster the service is for (not {@code null). + * @param serviceName + * the name of the service (not {@code null). + * @param desiredRepositoryVersion + * the repository version of the service (not {@code null). + * @return + */ + Service createNew(Cluster cluster, String serviceName, + RepositoryVersionEntity desiredRepositoryVersion); + /** + * Creates an in-memory representation of a service from an existing database + * object. + * + * @param cluster + * the cluster the service is installed in (not {@code null). + * @param serviceEntity + * the entity the existing database entry (not {@code null). + * @return + */ Service createExisting(Cluster cluster, ClusterServiceEntity serviceEntity); } http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbd826c/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java index a0c0db1..e537326 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java @@ -18,6 +18,7 @@ package org.apache.ambari.server.state; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -39,11 +40,11 @@ import org.apache.ambari.server.orm.dao.ClusterDAO; import org.apache.ambari.server.orm.dao.ClusterServiceDAO; import org.apache.ambari.server.orm.dao.ServiceConfigDAO; import org.apache.ambari.server.orm.dao.ServiceDesiredStateDAO; -import org.apache.ambari.server.orm.dao.StackDAO; import org.apache.ambari.server.orm.entities.ClusterConfigEntity; import org.apache.ambari.server.orm.entities.ClusterEntity; import org.apache.ambari.server.orm.entities.ClusterServiceEntity; import org.apache.ambari.server.orm.entities.ClusterServiceEntityPK; +import org.apache.ambari.server.orm.entities.RepositoryVersionEntity; import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntity; import org.apache.ambari.server.orm.entities.ServiceConfigEntity; import org.apache.ambari.server.orm.entities.ServiceDesiredStateEntity; @@ -82,11 +83,6 @@ public class ServiceImpl implements Service { private final ServiceComponentFactory serviceComponentFactory; /** - * Data access object for retrieving stack instances. - */ - private final StackDAO stackDAO; - - /** * Used to publish events relating to service CRUD operations. */ private final AmbariEventPublisher eventPublisher; @@ -97,17 +93,16 @@ public class ServiceImpl implements Service { private final String serviceName; @AssistedInject - ServiceImpl(@Assisted Cluster cluster, @Assisted String serviceName, ClusterDAO clusterDAO, + ServiceImpl(@Assisted Cluster cluster, @Assisted String serviceName, + @Assisted RepositoryVersionEntity desiredRepositoryVersion, ClusterDAO clusterDAO, ClusterServiceDAO clusterServiceDAO, ServiceDesiredStateDAO serviceDesiredStateDAO, - ServiceComponentFactory serviceComponentFactory, StackDAO stackDAO, - AmbariMetaInfo ambariMetaInfo, AmbariEventPublisher eventPublisher) - throws AmbariException { + ServiceComponentFactory serviceComponentFactory, AmbariMetaInfo ambariMetaInfo, + AmbariEventPublisher eventPublisher) throws AmbariException { this.cluster = cluster; this.clusterDAO = clusterDAO; this.clusterServiceDAO = clusterServiceDAO; this.serviceDesiredStateDAO = serviceDesiredStateDAO; this.serviceComponentFactory = serviceComponentFactory; - this.stackDAO = stackDAO; this.eventPublisher = eventPublisher; this.serviceName = serviceName; this.ambariMetaInfo = ambariMetaInfo; @@ -118,15 +113,14 @@ public class ServiceImpl implements Service { ServiceDesiredStateEntity serviceDesiredStateEntity = new ServiceDesiredStateEntity(); serviceDesiredStateEntity.setServiceName(serviceName); serviceDesiredStateEntity.setClusterId(cluster.getClusterId()); + serviceDesiredStateEntity.setDesiredRepositoryVersion(desiredRepositoryVersion); serviceDesiredStateEntityPK = getServiceDesiredStateEntityPK(serviceDesiredStateEntity); serviceEntityPK = getServiceEntityPK(serviceEntity); serviceDesiredStateEntity.setClusterServiceEntity(serviceEntity); serviceEntity.setServiceDesiredStateEntity(serviceDesiredStateEntity); - StackId stackId = cluster.getDesiredStackVersion(); - StackEntity stackEntity = stackDAO.find(stackId.getStackName(), stackId.getStackVersion()); - serviceDesiredStateEntity.setDesiredStack(stackEntity); + StackId stackId = desiredRepositoryVersion.getStackId(); ServiceInfo sInfo = ambariMetaInfo.getService(stackId.getStackName(), stackId.getStackVersion(), serviceName); @@ -143,15 +137,13 @@ public class ServiceImpl implements Service { ServiceImpl(@Assisted Cluster cluster, @Assisted ClusterServiceEntity serviceEntity, ClusterDAO clusterDAO, ClusterServiceDAO clusterServiceDAO, ServiceDesiredStateDAO serviceDesiredStateDAO, - ServiceComponentFactory serviceComponentFactory, StackDAO stackDAO, - AmbariMetaInfo ambariMetaInfo, AmbariEventPublisher eventPublisher) - throws AmbariException { + ServiceComponentFactory serviceComponentFactory, AmbariMetaInfo ambariMetaInfo, + AmbariEventPublisher eventPublisher) throws AmbariException { this.cluster = cluster; this.clusterDAO = clusterDAO; this.clusterServiceDAO = clusterServiceDAO; this.serviceDesiredStateDAO = serviceDesiredStateDAO; this.serviceComponentFactory = serviceComponentFactory; - this.stackDAO = stackDAO; this.eventPublisher = eventPublisher; serviceName = serviceEntity.getServiceName(); this.ambariMetaInfo = ambariMetaInfo; @@ -309,37 +301,46 @@ public class ServiceImpl implements Service { serviceDesiredStateDAO.merge(serviceDesiredStateEntity); } + /** + * {@inheritDoc} + */ @Override public StackId getDesiredStackVersion() { ServiceDesiredStateEntity serviceDesiredStateEntity = getServiceDesiredStateEntity(); StackEntity desiredStackEntity = serviceDesiredStateEntity.getDesiredStack(); - if( null != desiredStackEntity ) { - return new StackId(desiredStackEntity); - } else { - return null; - } + return new StackId(desiredStackEntity); } + /** + * {@inheritDoc} + */ @Override - public void setDesiredStackVersion(StackId stack) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting DesiredStackVersion of Service" + ", clusterName=" - + cluster.getClusterName() + ", clusterId=" - + cluster.getClusterId() + ", serviceName=" + getName() - + ", oldDesiredStackVersion=" + getDesiredStackVersion() - + ", newDesiredStackVersion=" + stack); - } + public RepositoryVersionEntity getDesiredRepositoryVersion() { + ServiceDesiredStateEntity serviceDesiredStateEntity = getServiceDesiredStateEntity(); + return serviceDesiredStateEntity.getDesiredRepositoryVersion(); + } - StackEntity stackEntity = stackDAO.find(stack.getStackName(), stack.getStackVersion()); + /** + * {@inheritDoc} + */ + @Override + @Transactional + public void setDesiredRepositoryVersion(RepositoryVersionEntity repositoryVersionEntity) { ServiceDesiredStateEntity serviceDesiredStateEntity = getServiceDesiredStateEntity(); - serviceDesiredStateEntity.setDesiredStack(stackEntity); + serviceDesiredStateEntity.setDesiredRepositoryVersion(repositoryVersionEntity); serviceDesiredStateDAO.merge(serviceDesiredStateEntity); + + Collection<ServiceComponent> components = getServiceComponents().values(); + for (ServiceComponent component : components) { + component.setDesiredRepositoryVersion(repositoryVersionEntity); + } } @Override public ServiceResponse convertToResponse() { ServiceResponse r = new ServiceResponse(cluster.getClusterId(), cluster.getClusterName(), - getName(), getDesiredStackVersion().getStackId(), getDesiredState().toString(), + getName(), getDesiredStackVersion().getStackId(), + getDesiredRepositoryVersion().getVersion(), getDesiredState().toString(), isCredentialStoreSupported(), isCredentialStoreEnabled()); r.setMaintenanceState(getMaintenanceState().name()); @@ -612,10 +613,6 @@ public class ServiceImpl implements Service { return getServiceDesiredStateEntity().getMaintenanceState(); } - private ClusterServiceEntity getServiceEntity() { - return clusterServiceDAO.findByPK(serviceEntityPK); - } - private ClusterServiceEntityPK getServiceEntityPK(ClusterServiceEntity serviceEntity) { ClusterServiceEntityPK pk = new ClusterServiceEntityPK(); pk.setClusterId(serviceEntity.getClusterId()); http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbd826c/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeContext.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeContext.java b/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeContext.java index 97f5003..f1bd900 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeContext.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeContext.java @@ -131,6 +131,13 @@ public class UpgradeContext { */ private StackId m_targetStackId; + /** + * The target repository before the upgrade started. This is the same + * regardless of whether the current direction is {@link Direction#UPGRADE} or + * {@link Direction#DOWNGRADE}. + */ + private RepositoryVersionEntity m_targetRepositoryVersion; + private MasterHostResolver m_resolver; private AmbariMetaInfo m_metaInfo; private List<ServiceComponentHost> m_unhealthy = new ArrayList<>(); @@ -214,7 +221,7 @@ public class UpgradeContext { m_upgradeRequestMap = upgradeRequestMap; // sets the original/target stacks - requires direction and cluster - setSourceAndTargetStacks(); + setSourceAndTargetVersions(); } /** @@ -239,7 +246,7 @@ public class UpgradeContext { m_version = upgradeEntity.getToVersion(); // sets the original/target stacks - requires direction and cluster - setSourceAndTargetStacks(); + setSourceAndTargetVersions(); if (m_direction == Direction.DOWNGRADE) { m_downgradeFromVersion = upgradeEntity.getFromVersion(); @@ -248,16 +255,18 @@ public class UpgradeContext { // since this constructor is initialized from an entity, then this map is // not present m_upgradeRequestMap = Collections.emptyMap(); + + m_autoSkipComponentFailures = upgradeEntity.isComponentFailureAutoSkipped(); + m_autoSkipServiceCheckFailures = upgradeEntity.isServiceCheckFailureAutoSkipped(); } /** - * Sets the source and target stack IDs. This will also set the effective - * stack ID based on the already-set {@link UpgradeType} and - * {@link Direction}. + * Sets the source and target versions. This will also set the effective stack + * ID based on the already-set {@link UpgradeType} and {@link Direction}. * * @see #getEffectiveStackId() */ - private void setSourceAndTargetStacks() { + private void setSourceAndTargetVersions() { StackId sourceStackId = null; // taret stack will not always be what it is today - tagging as experimental @@ -268,17 +277,20 @@ public class UpgradeContext { case UPGRADE: sourceStackId = m_cluster.getCurrentStackVersion(); - RepositoryVersionEntity targetRepositoryVersion = m_repoVersionDAO.findByStackNameAndVersion( + m_targetRepositoryVersion = m_repoVersionDAO.findByStackNameAndVersion( sourceStackId.getStackName(), m_version); // !!! TODO check the repo_version for patch-ness and restrict the // context to those services that require it. Consult the version // definition and add the service names to supportedServices - targetStackId = targetRepositoryVersion.getStackId(); + targetStackId = m_targetRepositoryVersion.getStackId(); break; case DOWNGRADE: sourceStackId = m_cluster.getCurrentStackVersion(); targetStackId = m_cluster.getDesiredStackVersion(); + + m_targetRepositoryVersion = m_repoVersionDAO.findByStackNameAndVersion( + targetStackId.getStackName(), m_version); break; } @@ -436,11 +448,13 @@ public class UpgradeContext { } /** - * @param targetStackId - * the targetStackId to set + * Gets the target repository version for this upgrade. + * + * @return the target repository version for this upgrade (never + * {@code null}). */ - public void setTargetStackId(StackId targetStackId) { - m_targetStackId = targetStackId; + public RepositoryVersionEntity getTargetRepositoryVersion() { + return m_targetRepositoryVersion; } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbd826c/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java index 92e01c2..bb84fb7 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java @@ -21,6 +21,7 @@ import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -28,6 +29,8 @@ import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.ambari.annotations.Experimental; +import org.apache.ambari.annotations.ExperimentalFeature; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.controller.internal.TaskResourceProvider; @@ -722,20 +725,34 @@ public class UpgradeHelper { * the upgrade state individually, we wrap this method inside of a transaction * to prevent 1000's of transactions from being opened and committed. * - * @param version - * desired version (like 2.2.1.0-1234) for upgrade - * @param targetServices - * targets for upgrade - * @param targetStack - * the target stack for the components. Express and Rolling upgrades determine - * the "correct" stack differently, so the component's desired stack id is not - * a reliable indicator. + * @param upgradeContext + * the upgrade context (not {@code null}). */ @Transactional - public void putComponentsToUpgradingState(String version, - Map<Service, Set<ServiceComponent>> targetServices, StackId targetStack) throws AmbariException { + @Experimental(feature = ExperimentalFeature.PATCH_UPGRADES) + public void putComponentsToUpgradingState(UpgradeContext upgradeContext) throws AmbariException { + + // determine which services/components will participate in the upgrade + Cluster cluster = upgradeContext.getCluster(); + Set<Service> services = new HashSet<>(cluster.getServices().values()); + Map<Service, Set<ServiceComponent>> targetServices = new HashMap<>(); + for (Service service : services) { + if (upgradeContext.isServiceSupported(service.getName())) { + Set<ServiceComponent> serviceComponents = new HashSet<>( + service.getServiceComponents().values()); + + targetServices.put(service, serviceComponents); + } + } + + RepositoryVersionEntity targetRepositoryVersion = upgradeContext.getTargetRepositoryVersion(); + StackId targetStack = targetRepositoryVersion.getStackId(); for (Map.Entry<Service, Set<ServiceComponent>> entry: targetServices.entrySet()) { + // set service desired repo + Service service = entry.getKey(); + service.setDesiredRepositoryVersion(targetRepositoryVersion); + for (ServiceComponent serviceComponent: entry.getValue()) { boolean versionAdvertised = false; @@ -751,25 +768,25 @@ public class UpgradeHelper { StackVersionListener.UNKNOWN_VERSION); } - UpgradeState upgradeState = UpgradeState.IN_PROGRESS; - String desiredVersion = version; - + UpgradeState upgradeStateToSet = UpgradeState.IN_PROGRESS; if (!versionAdvertised) { - upgradeState = UpgradeState.NONE; - desiredVersion = StackVersionListener.UNKNOWN_VERSION; + upgradeStateToSet = UpgradeState.NONE; } for (ServiceComponentHost serviceComponentHost: serviceComponent.getServiceComponentHosts().values()) { - serviceComponentHost.setUpgradeState(upgradeState); + if (serviceComponentHost.getUpgradeState() != upgradeStateToSet) { + serviceComponentHost.setUpgradeState(upgradeStateToSet); + } // !!! if we aren't version advertised, but there IS a version, set it. - if (!versionAdvertised && - !serviceComponentHost.getVersion().equals(StackVersionListener.UNKNOWN_VERSION)) { + if (!versionAdvertised && StringUtils.equals(StackVersionListener.UNKNOWN_VERSION, + serviceComponentHost.getVersion())) { serviceComponentHost.setVersion(StackVersionListener.UNKNOWN_VERSION); } } - serviceComponent.setDesiredVersion(desiredVersion); + // set component desired repo + serviceComponent.setDesiredRepositoryVersion(targetRepositoryVersion); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbd826c/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java index 228cf79..21c275b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java @@ -42,6 +42,8 @@ import javax.annotation.Nullable; import javax.persistence.EntityManager; import javax.persistence.RollbackException; +import org.apache.ambari.annotations.Experimental; +import org.apache.ambari.annotations.ExperimentalFeature; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.ConfigGroupNotFoundException; import org.apache.ambari.server.DuplicateResourceException; @@ -161,7 +163,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.ListMultimap; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; import com.google.common.eventbus.Subscribe; import com.google.inject.Inject; import com.google.inject.Injector; @@ -880,14 +881,20 @@ public class ClusterImpl implements Cluster { services.put(service.getName(), service); } + /** + * {@inheritDoc} + */ @Override - public Service addService(String serviceName) throws AmbariException { + public Service addService(String serviceName, RepositoryVersionEntity repositoryVersion) throws AmbariException { if (services.containsKey(serviceName)) { - throw new AmbariException("Service already exists" + ", clusterName=" + getClusterName() - + ", clusterId=" + getClusterId() + ", serviceName=" + serviceName); + String message = MessageFormat.format("The {0} service already exists in {1}", serviceName, + getClusterName()); + + throw new AmbariException(message); } - Service service = serviceFactory.createNew(this, serviceName); + @Experimental(feature = ExperimentalFeature.PATCH_UPGRADES) + Service service = serviceFactory.createNew(this, serviceName, repositoryVersion); addService(service); return service; @@ -915,11 +922,6 @@ public class ClusterImpl implements Cluster { @Override public void setDesiredStackVersion(StackId stackId) throws AmbariException { - setDesiredStackVersion(stackId, false); - } - - @Override - public void setDesiredStackVersion(StackId stackId, boolean cascade) throws AmbariException { clusterGlobalLock.writeLock().lock(); try { if (LOG.isDebugEnabled()) { @@ -937,19 +939,6 @@ public class ClusterImpl implements Cluster { clusterEntity.setDesiredStack(stackEntity); clusterEntity = clusterDAO.merge(clusterEntity); - if (cascade) { - for (Service service : getServices().values()) { - service.setDesiredStackVersion(stackId); - - for (ServiceComponent sc : service.getServiceComponents().values()) { - sc.setDesiredStackVersion(stackId); - - for (ServiceComponentHost sch : sc.getServiceComponentHosts().values()) { - sch.setDesiredStackVersion(stackId); - } - } - } - } loadServiceConfigTypes(); } finally { clusterGlobalLock.writeLock().unlock(); @@ -1014,6 +1003,7 @@ public class ClusterImpl implements Cluster { * @return */ @Override + @Experimental(feature = ExperimentalFeature.PATCH_UPGRADES) public ClusterVersionEntity getCurrentClusterVersion() { Collection<ClusterVersionEntity> clusterVersionEntities = getClusterEntity().getClusterVersionEntities(); for (ClusterVersionEntity clusterVersionEntity : clusterVersionEntities) { @@ -1022,6 +1012,11 @@ public class ClusterImpl implements Cluster { return clusterVersionEntity; } } + + if( clusterVersionEntities.size() == 1 ) { + return clusterVersionEntities.iterator().next(); + } + return null; } @@ -1120,81 +1115,6 @@ public class ClusterImpl implements Cluster { } /** - * During the Finalize Action, want to transition all Host Versions from INSTALLED to CURRENT, and the last CURRENT one to INSTALLED. - * @param hostNames Collection of host names - * @param currentClusterVersion Entity that contains the cluster's current stack (with its name and version) - * @param desiredState Desired state must be {@link RepositoryVersionState#CURRENT} - * @throws AmbariException - */ - @Override - public void mapHostVersions(Set<String> hostNames, ClusterVersionEntity currentClusterVersion, RepositoryVersionState desiredState) throws AmbariException { - if (currentClusterVersion == null) { - throw new AmbariException("Could not find current stack version of cluster " + getClusterName()); - } - - final Set<RepositoryVersionState> validStates = Sets.newHashSet(RepositoryVersionState.CURRENT); - - if (!validStates.contains(desiredState)) { - throw new AmbariException("The state must be one of [" + StringUtils.join(validStates, ", ") + "]"); - } - - clusterGlobalLock.writeLock().lock(); - try { - StackEntity repoVersionStackEntity = currentClusterVersion.getRepositoryVersion().getStack(); - StackId repoVersionStackId = new StackId(repoVersionStackEntity); - - Map<String, HostVersionEntity> existingHostToHostVersionEntity = new HashMap<>(); - List<HostVersionEntity> existingHostVersionEntities = hostVersionDAO.findByClusterStackAndVersion( - getClusterName(), repoVersionStackId, - currentClusterVersion.getRepositoryVersion().getVersion()); - - if (existingHostVersionEntities != null) { - for (HostVersionEntity entity : existingHostVersionEntities) { - existingHostToHostVersionEntity.put(entity.getHostName(), entity); - } - } - - Sets.SetView<String> intersection = Sets.intersection( - existingHostToHostVersionEntity.keySet(), hostNames); - - for (String hostname : hostNames) { - List<HostVersionEntity> currentHostVersions = hostVersionDAO.findByClusterHostAndState( - getClusterName(), hostname, RepositoryVersionState.CURRENT); - HostVersionEntity currentHostVersionEntity = (currentHostVersions != null && currentHostVersions.size() == 1) ? currentHostVersions.get(0) - : null; - - // Notice that if any hosts already have the desired stack and version, regardless of the state, we try - // to be robust and only insert records for the missing hosts. - if (!intersection.contains(hostname)) { - // According to the business logic, we don't create objects in a CURRENT state. - HostEntity hostEntity = hostDAO.findByName(hostname); - HostVersionEntity hostVersionEntity = new HostVersionEntity(hostEntity, currentClusterVersion.getRepositoryVersion(), desiredState); - hostVersionDAO.create(hostVersionEntity); - } else { - HostVersionEntity hostVersionEntity = existingHostToHostVersionEntity.get(hostname); - if (hostVersionEntity.getState() != desiredState) { - hostVersionEntity.setState(desiredState); - hostVersionEntity = hostVersionDAO.merge(hostVersionEntity); - } - - // Maintain the invariant that only one HostVersionEntity is allowed - // to have a state of CURRENT. - if (currentHostVersionEntity != null - && !currentHostVersionEntity.getRepositoryVersion().equals( - hostVersionEntity.getRepositoryVersion()) - && desiredState == RepositoryVersionState.CURRENT - && currentHostVersionEntity.getState() == RepositoryVersionState.CURRENT) { - currentHostVersionEntity.setState(RepositoryVersionState.INSTALLED); - hostVersionDAO.merge(currentHostVersionEntity); - } - } - } - } finally { - clusterGlobalLock.writeLock().unlock(); - } - } - - /** * {@inheritDoc} */ @Override
