Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java?rev=1457194&r1=1457193&r2=1457194&view=diff ============================================================================== --- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java (original) +++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java Sat Mar 16 03:39:02 2013 @@ -17,6 +17,7 @@ */ package org.apache.ambari.server.agent; +import com.google.gson.Gson; import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Singleton; @@ -66,7 +67,9 @@ public class HeartBeatHandler { @Inject ActionMetadata actionMetadata; @Inject - HBaseMasterPortScanner scaner; + HBaseMasterPortScanner scanner; + @Inject + private Gson gson; private Map<String, Long> hostResponseIds = new HashMap<String, Long>(); private Map<String, HeartBeatResponse> hostResponses = new HashMap<String, HeartBeatResponse>(); @@ -78,7 +81,7 @@ public class HeartBeatHandler { this.actionQueue = aq; this.actionManager = am; this.heartbeatMonitor = new HeartbeatMonitor(fsm, aq, am, 60000); - this.heartbeatMonitor.setScaner(scaner); + this.heartbeatMonitor.setScanner(scanner); injector.injectMembers(this); } @@ -151,7 +154,7 @@ public class HeartBeatHandler { hostObject.handleEvent(new HostUnhealthyHeartbeatEvent(hostname, now, null)); } - if(hostState != hostObject.getState()) scaner.updateHBaseMaster(hostObject); + if(hostState != hostObject.getState()) scanner.updateHBaseMaster(hostObject); } catch (InvalidStateTransitionException ex) { LOG.warn("Asking agent to reregister due to " + ex.getMessage(), ex); hostObject.setState(HostState.INIT); @@ -205,7 +208,9 @@ public class HeartBeatHandler { scHost.handleEvent(new ServiceComponentHostOpInProgressEvent(schName, hostname, now)); } - if(state != scHost.getState() && schName.equals(Role.HBASE_MASTER.toString())) scaner.updateHBaseMaster(cl); + if(state != scHost.getState() && schName.equals(Role.HBASE_MASTER.toString())) { + scanner.updateHBaseMaster(cl); + } } catch (ServiceComponentNotFoundException scnex) { LOG.warn("Service component not found ", scnex); } catch (InvalidStateTransitionException ex) { @@ -220,7 +225,7 @@ public class HeartBeatHandler { protected void processStatusReports(HeartBeat heartbeat, String hostname, Clusters clusterFsm) throws - AmbariException { + AmbariException { Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname); for (Cluster cl : clusters) { for (ComponentStatus status : heartbeat.componentStatus) { @@ -230,9 +235,9 @@ public class HeartBeatHandler { String componentName = status.getComponentName(); if (svc.getServiceComponents().containsKey(componentName)) { ServiceComponent svcComp = svc.getServiceComponent( - componentName); + componentName); ServiceComponentHost scHost = svcComp.getServiceComponentHost( - hostname); + hostname); State prevState = scHost.getState(); State liveState = State.valueOf(State.class, status.getStatus()); if (prevState.equals(State.INSTALLED) @@ -242,62 +247,58 @@ public class HeartBeatHandler { || prevState.equals(State.STOPPING) || prevState.equals(State.STOP_FAILED)) { scHost.setState(liveState); - LOG.info("State of service component " + componentName - + " of service " + status.getServiceName() - + " of cluster " + status.getClusterName() - + " has changed from " + prevState + " to " + liveState - + " at host " + hostname); - if (!prevState.equals(liveState) - && scHost.getServiceComponentName().equals(Role.HBASE_MASTER.toString())) { - scaner.updateHBaseMaster(scHost); + if (!prevState.equals(liveState)) { + LOG.info("State of service component " + componentName + + " of service " + status.getServiceName() + + " of cluster " + status.getClusterName() + + " has changed from " + prevState + " to " + liveState + + " at host " + hostname); + if (scHost.getServiceComponentName().equals(Role.HBASE_MASTER.toString())) { + scanner.updateHBaseMaster(scHost); + } } } - if(null != status.getStackVersion() && !status.getStackVersion().isEmpty()) - { - scHost.setStackVersion(new StackId(status.getStackVersion())); + if (null != status.getStackVersion() && !status.getStackVersion().isEmpty()) { + scHost.setStackVersion(gson.fromJson(status.getStackVersion(), StackId.class)); } // TODO need to get config version and stack version from live state } else { // TODO: What should be done otherwise? } - } - catch (ServiceNotFoundException e) { + } catch (ServiceNotFoundException e) { LOG.warn("Received a live status update for a non-initialized" - + " service" - + ", clusterName=" + status.getClusterName() - + ", serviceName=" + status.getServiceName()); + + " service" + + ", clusterName=" + status.getClusterName() + + ", serviceName=" + status.getServiceName()); // FIXME ignore invalid live update and continue for now? continue; - } - catch (ServiceComponentNotFoundException e) { + } catch (ServiceComponentNotFoundException e) { LOG.warn("Received a live status update for a non-initialized" - + " servicecomponent" - + ", clusterName=" + status.getClusterName() - + ", serviceName=" + status.getServiceName() - + ", componentName=" + status.getComponentName()); + + " servicecomponent" + + ", clusterName=" + status.getClusterName() + + ", serviceName=" + status.getServiceName() + + ", componentName=" + status.getComponentName()); // FIXME ignore invalid live update and continue for now? continue; - } - catch (ServiceComponentHostNotFoundException e) { + } catch (ServiceComponentHostNotFoundException e) { LOG.warn("Received a live status update for a non-initialized" - + " service" - + ", clusterName=" + status.getClusterName() - + ", serviceName=" + status.getServiceName() - + ", componentName=" + status.getComponentName() - + ", hostname=" + hostname); + + " service" + + ", clusterName=" + status.getClusterName() + + ", serviceName=" + status.getServiceName() + + ", componentName=" + status.getComponentName() + + ", hostname=" + hostname); // FIXME ignore invalid live update and continue for now? continue; - } - catch (RuntimeException e) { + } catch (RuntimeException e) { LOG.warn("Received a live status with invalid payload" - + " service" - + ", clusterName=" + status.getClusterName() - + ", serviceName=" + status.getServiceName() - + ", componentName=" + status.getComponentName() - + ", hostname=" + hostname - + ", error=" + e.getMessage()); + + " service" + + ", clusterName=" + status.getClusterName() + + ", serviceName=" + status.getServiceName() + + ", componentName=" + status.getComponentName() + + ", hostname=" + hostname + + ", error=" + e.getMessage()); continue; } }
Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java?rev=1457194&r1=1457193&r2=1457194&view=diff ============================================================================== --- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java (original) +++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java Sat Mar 16 03:39:02 2013 @@ -47,10 +47,10 @@ public class HeartbeatMonitor implements private final int threadWakeupInterval; //1 minute private volatile boolean shouldRun = true; private Thread monitorThread = null; - private HBaseMasterPortScanner scaner; + private HBaseMasterPortScanner scanner; - public void setScaner(HBaseMasterPortScanner scaner) { - this.scaner = scaner; + public void setScanner(HBaseMasterPortScanner scanner) { + this.scanner = scanner; } public HeartbeatMonitor(Clusters fsm, ActionQueue aq, ActionManager am, @@ -116,7 +116,7 @@ public class HeartbeatMonitor implements LOG.warn("Hearbeat lost from host "+host); //Heartbeat is expired hostObj.handleEvent(new HostHeartbeatLostEvent(host)); - if(hostState != hostObj.getState() && scaner != null) scaner.updateHBaseMaster(hostObj); + if(hostState != hostObj.getState() && scanner != null) scanner.updateHBaseMaster(hostObj); //Purge action queue actionQueue.dequeueAll(host); //notify action manager Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java?rev=1457194&r1=1457193&r2=1457194&view=diff ============================================================================== --- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java (original) +++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java Sat Mar 16 03:39:02 2013 @@ -30,7 +30,23 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; -import org.apache.ambari.server.*; + +import com.google.gson.Gson; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Singleton; +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.ClusterNotFoundException; +import org.apache.ambari.server.DuplicateResourceException; +import org.apache.ambari.server.HostNotFoundException; +import org.apache.ambari.server.ObjectNotFoundException; +import org.apache.ambari.server.ParentObjectNotFoundException; +import org.apache.ambari.server.Role; +import org.apache.ambari.server.RoleCommand; +import org.apache.ambari.server.ServiceComponentHostNotFoundException; +import org.apache.ambari.server.ServiceComponentNotFoundException; +import org.apache.ambari.server.ServiceNotFoundException; +import org.apache.ambari.server.StackAccessException; import org.apache.ambari.server.actionmanager.ActionManager; import org.apache.ambari.server.actionmanager.HostRoleCommand; import org.apache.ambari.server.actionmanager.RequestStatus; @@ -45,19 +61,40 @@ import org.apache.ambari.server.security import org.apache.ambari.server.security.authorization.Users; import org.apache.ambari.server.serveraction.ServerAction; import org.apache.ambari.server.stageplanner.RoleGraph; -import org.apache.ambari.server.state.*; +import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.ComponentInfo; +import org.apache.ambari.server.state.Config; +import org.apache.ambari.server.state.ConfigFactory; +import org.apache.ambari.server.state.DesiredConfig; +import org.apache.ambari.server.state.Host; +import org.apache.ambari.server.state.OperatingSystemInfo; +import org.apache.ambari.server.state.PropertyInfo; +import org.apache.ambari.server.state.RepositoryInfo; +import org.apache.ambari.server.state.Service; +import org.apache.ambari.server.state.ServiceComponent; +import org.apache.ambari.server.state.ServiceComponentFactory; +import org.apache.ambari.server.state.ServiceComponentHost; +import org.apache.ambari.server.state.ServiceComponentHostEvent; +import org.apache.ambari.server.state.ServiceComponentHostFactory; +import org.apache.ambari.server.state.ServiceFactory; +import org.apache.ambari.server.state.ServiceInfo; +import org.apache.ambari.server.state.StackId; +import org.apache.ambari.server.state.StackInfo; +import org.apache.ambari.server.state.State; import org.apache.ambari.server.state.fsm.InvalidStateTransitionException; -import org.apache.ambari.server.state.svccomphost.*; +import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent; +import org.apache.ambari.server.state.svccomphost.ServiceComponentHostMaintenanceEvent; +import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpInProgressEvent; +import org.apache.ambari.server.state.svccomphost.ServiceComponentHostRestoreEvent; +import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartEvent; +import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStopEvent; +import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent; import org.apache.ambari.server.utils.StageUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.gson.Gson; -import com.google.inject.Inject; -import com.google.inject.Injector; -import com.google.inject.Singleton; - @Singleton public class AmbariManagementControllerImpl implements AmbariManagementController { @@ -103,7 +140,6 @@ public class AmbariManagementControllerI @Inject private Configuration configs; - final private String masterHostname; final private static String JDK_RESOURCE_LOCATION = @@ -1388,11 +1424,17 @@ public class AmbariManagementControllerI boolean requiresVersionUpdate = requestedVersionString != null && !requestedVersionString.isEmpty(); if (requiresVersionUpdate) { + LOG.info("Received a cluster update request" + + ", clusterName=" + request.getClusterName() + + ", request=" + request); requestedVersion = new StackId(requestedVersionString); if (!requestedVersion.getStackName().equals(currentVersion.getStackName())) { throw new AmbariException("Upgrade not possible between different stacks."); } requiresVersionUpdate = !currentVersion.equals(requestedVersion); + if(!requiresVersionUpdate) { + LOG.info("The cluster is already at " + currentVersion); + } } if (requiresVersionUpdate && requiresHostListUpdate) { @@ -1407,6 +1449,7 @@ public class AmbariManagementControllerI } if (requiresVersionUpdate) { + LOG.info("Upgrade cluster request received for stack " + requestedVersion); boolean retry = false; if (0 == currentVersion.compareTo(desiredVersion)) { if (1 != requestedVersion.compareTo(currentVersion)) { @@ -1427,6 +1470,7 @@ public class AmbariManagementControllerI } } else { retry = true; + LOG.info("Received upgrade request is a retry."); if (0 != requestedVersion.compareTo(desiredVersion)) { throw new AmbariException("Upgrade in progress to target version : " + desiredVersion @@ -1472,11 +1516,13 @@ public class AmbariManagementControllerI Map<String, Map<State, List<ServiceComponentHost>>> changedScHosts = new HashMap<String, Map<State, List<ServiceComponentHost>>>(); + LOG.info("Identifying components to upgrade."); fillComponentsToUpgrade(request, cluster, changedServices, changedComps, changedScHosts); Map<String, String> requestParameters = new HashMap<String, String>(); requestParameters.put(Configuration.UPGRADE_TO_STACK, gson.toJson(requestedVersion)); requestParameters.put(Configuration.UPGRADE_FROM_STACK, gson.toJson(currentVersion)); + LOG.info("Creating stages for upgrade."); List<Stage> stages = doStageCreation(cluster, changedServices, changedComps, changedScHosts, requestParameters); @@ -1487,7 +1533,9 @@ public class AmbariManagementControllerI addFinalizeUpgradeAction(cluster, stages); persistStages(stages); updateServiceStates(changedServices, changedComps, changedScHosts); - return getRequestStatusResponse(stages.get(0).getRequestId()); + long requestId = stages.get(0).getRequestId(); + LOG.info(stages.size() + " stages created for upgrade and the request id is " + requestId); + return getRequestStatusResponse(requestId); } return null; @@ -2895,6 +2943,9 @@ public class AmbariManagementControllerI } } + // If upgrade request comes without state information then its an error + boolean upgradeRequest = checkIfUpgradeRequestAndValidate(request, cluster, s, sc, sch); + if (newState == null) { if (LOG.isDebugEnabled()) { LOG.debug("Nothing to do for new updateServiceComponentHost request" @@ -2915,8 +2966,6 @@ public class AmbariManagementControllerI seenNewStates.add(newState); - boolean upgradeRequest = checkIfUpgradeRequestAndValidate(request, cluster, s, sc, sch); - if (!processingUpgradeRequest && upgradeRequest) { processingUpgradeRequest = true; // this needs to be the first request @@ -3185,19 +3234,29 @@ public class AmbariManagementControllerI throw getHostComponentUpgradeException(request, cluster, s, sc, sch, "Upgrade cannot be accompanied with config modification"); } - if (!request.getDesiredState().equals(State.INSTALLED.toString())) { + if (request.getDesiredState() == null + || !request.getDesiredState().equals(State.INSTALLED.toString())) { throw getHostComponentUpgradeException(request, cluster, s, sc, sch, "The desired state for an upgrade request must be " + State.INSTALLED); } + LOG.info("Received upgrade request to " + requestedStackId + " for " + + "component " + sch.getServiceComponentName() + + " on " + sch.getHostName()); + } else { + LOG.info("Stack id " + requestedStackId + " provided in the request matches" + + " the current stack id of the " + + "component " + sch.getServiceComponentName() + + " on " + sch.getHostName() + ". It will not be upgraded."); } } return isUpgradeRequest; } - private AmbariException getHostComponentUpgradeException(ServiceComponentHostRequest request, Cluster cluster, - Service s, ServiceComponent sc, ServiceComponentHost sch, - String message) throws AmbariException { + private AmbariException getHostComponentUpgradeException( + ServiceComponentHostRequest request, Cluster cluster, + Service s, ServiceComponent sc, ServiceComponentHost sch, + String message) throws AmbariException { return new AmbariException(message + ", clusterName=" + cluster.getClusterName() + ", clusterId=" + cluster.getClusterId() Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterResourceProvider.java URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterResourceProvider.java?rev=1457194&r1=1457193&r2=1457194&view=diff ============================================================================== --- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterResourceProvider.java (original) +++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterResourceProvider.java Sat Mar 16 03:39:02 2013 @@ -140,11 +140,14 @@ class ClusterResourceProvider extends Ab public RequestStatus updateResources(Request request, Predicate predicate) throws SystemException, UnsupportedPropertyException, NoSuchResourceException, NoSuchParentResourceException { - - for (Map<String, Object> propertyMap : getPropertyMaps(request.getProperties().iterator().next(), predicate)) { + RequestStatusResponse response = null; + Set<Map<String, Object>> propertyMaps = getPropertyMaps(request.getProperties().iterator().next(), predicate); + if (propertyMaps.size() > 1) { + throw new SystemException("Single update request cannot modify multiple clusters.", null); + } + for (Map<String, Object> propertyMap : propertyMaps) { final ClusterRequest clusterRequest = getRequest(propertyMap); - - modifyResources(new Command<RequestStatusResponse>() { + response = modifyResources(new Command<RequestStatusResponse>() { @Override public RequestStatusResponse invoke() throws AmbariException { return getManagementController().updateCluster(clusterRequest); @@ -152,7 +155,7 @@ class ClusterResourceProvider extends Ab }); } notifyUpdate(Resource.Type.Cluster, request, predicate); - return getRequestStatus(null); + return getRequestStatus(response); } @Override Modified: incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java?rev=1457194&r1=1457193&r2=1457194&view=diff ============================================================================== --- incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java (original) +++ incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java Sat Mar 16 03:39:02 2013 @@ -737,12 +737,12 @@ public class TestHeartbeatHandler { hb.setReports(new ArrayList<CommandReport>()); ArrayList<ComponentStatus> componentStatuses = new ArrayList<ComponentStatus>(); - ComponentStatus componentStatus1 = - createComponentStatus(DummyCluster, HDFS, DummyHostStatus, State.STARTED, DATANODE, "HDP-1.3.0"); + ComponentStatus componentStatus1 = createComponentStatus(DummyCluster, HDFS, DummyHostStatus, State.STARTED, + DATANODE, "{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}"); ComponentStatus componentStatus2 = createComponentStatus(DummyCluster, HDFS, DummyHostStatus, State.STARTED, NAMENODE, ""); - ComponentStatus componentStatus3 = - createComponentStatus(DummyCluster, HDFS, DummyHostStatus, State.INSTALLED, HDFS_CLIENT, "HDP-1.3.0"); + ComponentStatus componentStatus3 = createComponentStatus(DummyCluster, HDFS, DummyHostStatus, State.INSTALLED, + HDFS_CLIENT, "{\"stackName\":\"HDP\",\"stackVersion\":\"1.3.0\"}"); componentStatuses.add(componentStatus1); componentStatuses.add(componentStatus2); Modified: incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java?rev=1457194&r1=1457193&r2=1457194&view=diff ============================================================================== --- incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java (original) +++ incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java Sat Mar 16 03:39:02 2013 @@ -3157,6 +3157,15 @@ public class AmbariManagementControllerT updateHostAndCompareExpectedFailure(reqs, "The desired state for an upgrade request must be"); c1.setCurrentStackVersion(new StackId("HDP-0.2")); + sch1.setState(State.UPGRADING); + reqs.clear(); + req1 = new ServiceComponentHostRequest(clusterName, serviceName1, + componentName1, host1, null, null); + req1.setDesiredStackId("HDP-0.2"); + reqs.add(req1); + updateHostAndCompareExpectedFailure(reqs, "The desired state for an upgrade request must be"); + + c1.setCurrentStackVersion(new StackId("HDP-0.2")); sch1.setState(State.INSTALLED); sch1.setDesiredState(State.INSTALLED); sch2.setState(State.INSTALLED); @@ -3174,7 +3183,8 @@ public class AmbariManagementControllerT updateHostAndCompareExpectedFailure(reqs, "An upgrade request cannot be combined with other"); c1.setCurrentStackVersion(new StackId("HDP-0.2")); - sch1.setState(State.UPGRADING); + sch1.setState(State.INSTALLED); + sch1.setStackVersion(new StackId("HDP-0.2")); reqs.clear(); req1 = new ServiceComponentHostRequest(clusterName, serviceName1, componentName1, host1, @@ -3183,6 +3193,18 @@ public class AmbariManagementControllerT reqs.add(req1); RequestStatusResponse resp = controller.updateHostComponents(reqs); Assert.assertNull(resp); + + c1.setCurrentStackVersion(new StackId("HDP-0.2")); + sch1.setState(State.INSTALLED); + sch1.setStackVersion(new StackId("HDP-0.2")); + reqs.clear(); + req1 = new ServiceComponentHostRequest(clusterName, serviceName1, + componentName1, host1, + null, State.INSTALLED.toString()); + req1.setDesiredStackId("HDP-0.2"); + reqs.add(req1); + resp = controller.updateHostComponents(reqs); + Assert.assertNull(resp); } private void updateHostAndCompareExpectedFailure(Set<ServiceComponentHostRequest> reqs,
