Author: smohanty
Date: Mon Apr 8 16:53:32 2013
New Revision: 1465681
URL: http://svn.apache.org/r1465681
Log:
AMBARI-1789. Stopping and then Starting all services doesn't start NameNode.
(smohanty)
Added:
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostStartedEvent.java
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostStoppedEvent.java
Modified:
incubator/ambari/trunk/CHANGES.txt
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
incubator/ambari/trunk/ambari-agent/src/test/python/TestActionQueue.py
incubator/ambari/trunk/ambari-agent/src/test/python/TestHeartbeat.py
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandReport.java
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEvent.java
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEventType.java
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostTest.java
Modified: incubator/ambari/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1465681&r1=1465680&r2=1465681&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Mon Apr 8 16:53:32 2013
@@ -656,6 +656,9 @@ Trunk (unreleased changes):
BUG FIXES
+ AMBARI-1789. Stopping and then Starting all services doesn't start
+ NameNode. (smohanty)
+
AMBARI-1822. Hue service link points to wrong URL and no smoke test drop
down is shown. (yusaku)
Modified:
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/ActionQueue.py?rev=1465681&r1=1465680&r2=1465681&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
(original)
+++
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
Mon Apr 8 16:53:32 2013
@@ -46,13 +46,13 @@ installScriptHash = -1
class ActionQueue(threading.Thread):
""" Action Queue for the agent. We pick one command at a time from the queue
and execute that """
-
+
commandQueue = Queue.Queue()
resultQueue = Queue.Queue()
- STATUS_COMMAND='STATUS_COMMAND'
- EXECUTION_COMMAND='EXECUTION_COMMAND'
- UPGRADE_STATUS='UPGRADE'
+ STATUS_COMMAND = 'STATUS_COMMAND'
+ EXECUTION_COMMAND = 'EXECUTION_COMMAND'
+ UPGRADE_STATUS = 'UPGRADE'
IDLE_SLEEP_TIME = 5
@@ -61,17 +61,17 @@ class ActionQueue(threading.Thread):
self.config = config
self.sh = shellRunner()
self._stop = threading.Event()
- self.maxRetries = config.getint('command', 'maxretries')
+ self.maxRetries = config.getint('command', 'maxretries')
self.sleepInterval = config.getint('command', 'sleepBetweenRetries')
self.puppetExecutor = PuppetExecutor.PuppetExecutor(
- config.get('puppet', 'puppetmodules'),
- config.get('puppet', 'puppet_home'),
- config.get('puppet', 'facter_home'),
- config.get('agent', 'prefix'), config)
+ config.get('puppet', 'puppetmodules'),
+ config.get('puppet', 'puppet_home'),
+ config.get('puppet', 'facter_home'),
+ config.get('agent', 'prefix'), config)
self.pythonExecutor = PythonExecutor.PythonExecutor(
- config.get('agent', 'prefix'), config)
+ config.get('agent', 'prefix'), config)
self.upgradeExecutor = UpgradeExecutor.UpgradeExecutor(self.pythonExecutor,
- self.puppetExecutor, config)
+ self.puppetExecutor, config)
self.tmpdir = config.get('agent', 'prefix')
self.commandInProgress = None
@@ -124,7 +124,7 @@ class ActionQueue(threading.Thread):
result = livestatus.build()
logger.debug("Got live status for component " + component +\
" of service " + str(service) +\
- " of cluster " + str(cluster))
+ " of cluster " + str(cluster))
logger.debug(pprint.pformat(result))
if result is not None:
self.resultQueue.put((ActionQueue.STATUS_COMMAND, result))
@@ -132,9 +132,9 @@ class ActionQueue(threading.Thread):
except Exception, err:
traceback.print_exc()
logger.warn(err)
- pass
+ pass
else:
- logger.warn("Unrecognized command " + pprint.pformat(result))
+ logger.warn("Unrecognized command " + pprint.pformat(command))
if not self.stopped():
time.sleep(self.IDLE_SLEEP_TIME)
@@ -152,29 +152,30 @@ class ActionQueue(threading.Thread):
# Building report for command in progress
if self.commandInProgress is not None:
try:
- tmpout= open(self.commandInProgress['tmpout'], 'r').read()
- tmperr= open(self.commandInProgress['tmperr'], 'r').read()
+ tmpout = open(self.commandInProgress['tmpout'], 'r').read()
+ tmperr = open(self.commandInProgress['tmperr'], 'r').read()
except Exception, err:
logger.warn(err)
- tmpout='...'
- tmperr='...'
+ tmpout = '...'
+ tmperr = '...'
grep = Grep()
output = grep.tail(tmpout, Grep.OUTPUT_LAST_LINES)
inprogress = {
- 'role' : self.commandInProgress['role'],
- 'actionId' : self.commandInProgress['actionId'],
- 'taskId' : self.commandInProgress['taskId'],
- 'stdout' : grep.filterMarkup(output),
- 'clusterName' : self.commandInProgress['clusterName'],
- 'stderr' : tmperr,
- 'exitCode' : 777,
- 'serviceName' : self.commandInProgress['serviceName'],
- 'status' : 'IN_PROGRESS'
+ 'role': self.commandInProgress['role'],
+ 'actionId': self.commandInProgress['actionId'],
+ 'taskId': self.commandInProgress['taskId'],
+ 'stdout': grep.filterMarkup(output),
+ 'clusterName': self.commandInProgress['clusterName'],
+ 'stderr': tmperr,
+ 'exitCode': 777,
+ 'serviceName': self.commandInProgress['serviceName'],
+ 'status': 'IN_PROGRESS',
+ 'roleCommand': self.commandInProgress['roleCommand']
}
resultReports.append(inprogress)
- result={
- 'reports' : resultReports,
- 'componentStatus' : resultComponentStatus
+ result = {
+ 'reports': resultReports,
+ 'componentStatus': resultComponentStatus
}
return result
@@ -197,36 +198,40 @@ class ActionQueue(threading.Thread):
taskId = command['taskId']
# Preparing 'IN_PROGRESS' report
self.commandInProgress = {
- 'role' : command['role'],
- 'actionId' : commandId,
- 'taskId' : taskId,
- 'clusterName' : clusterName,
- 'serviceName' : serviceName,
+ 'role': command['role'],
+ 'actionId': commandId,
+ 'taskId': taskId,
+ 'clusterName': clusterName,
+ 'serviceName': serviceName,
'tmpout': self.tmpdir + os.sep + 'output-' + str(taskId) + '.txt',
- 'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt'
+ 'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt',
+ 'roleCommand': roleCommand
}
# running command
if command['commandType'] == ActionQueue.EXECUTION_COMMAND:
if command['roleCommand'] == ActionQueue.UPGRADE_STATUS:
- commandresult = self.upgradeExecutor.perform_stack_upgrade(command,
self.commandInProgress['tmpout'], self.commandInProgress['tmperr'])
+ commandresult = self.upgradeExecutor.perform_stack_upgrade(command,
self.commandInProgress['tmpout'],
+ self.commandInProgress['tmperr'])
else:
- commandresult = self.puppetExecutor.runCommand(command,
self.commandInProgress['tmpout'], self.commandInProgress['tmperr'])
- # dumping results
+ commandresult = self.puppetExecutor.runCommand(command,
self.commandInProgress['tmpout'],
+ self.commandInProgress['tmperr'])
+ # dumping results
self.commandInProgress = None
status = "COMPLETED"
if commandresult['exitcode'] != 0:
status = "FAILED"
-
- # assume some puppet pluing to run these commands
- roleResult = {'role' : command['role'],
- 'actionId' : commandId,
- 'taskId' : command['taskId'],
- 'stdout' : commandresult['stdout'],
- 'clusterName' : clusterName,
- 'stderr' : commandresult['stderr'],
- 'exitCode' : commandresult['exitcode'],
- 'serviceName' : serviceName,
- 'status' : status}
+
+ # assume some puppet plumbing to run these commands
+ roleResult = {'role': command['role'],
+ 'actionId': commandId,
+ 'taskId': command['taskId'],
+ 'stdout': commandresult['stdout'],
+ 'clusterName': clusterName,
+ 'stderr': commandresult['stderr'],
+ 'exitCode': commandresult['exitcode'],
+ 'serviceName': serviceName,
+ 'status': status,
+ 'roleCommand': roleCommand}
if roleResult['stdout'] == '':
roleResult['stdout'] = 'None'
if roleResult['stderr'] == '':
Modified: incubator/ambari/trunk/ambari-agent/src/test/python/TestActionQueue.py
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/test/python/TestActionQueue.py?rev=1465681&r1=1465680&r2=1465681&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/test/python/TestActionQueue.py
(original)
+++ incubator/ambari/trunk/ambari-agent/src/test/python/TestActionQueue.py Mon
Apr 8 16:53:32 2013
@@ -227,7 +227,8 @@ class TestActionQueue(TestCase):
'status': 'COMPLETED',
'stderr': 'def',
'stdout': 'abc',
- 'taskId': 'taskId'}]
+ 'taskId': 'taskId',
+ 'roleCommand': 'UPGRADE'}]
self.assertEquals(result, expected_result)
Modified: incubator/ambari/trunk/ambari-agent/src/test/python/TestHeartbeat.py
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/test/python/TestHeartbeat.py?rev=1465681&r1=1465680&r2=1465681&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/test/python/TestHeartbeat.py
(original)
+++ incubator/ambari/trunk/ambari-agent/src/test/python/TestHeartbeat.py Mon
Apr 8 16:53:32 2013
@@ -110,7 +110,8 @@ class TestHeartbeat(TestCase):
'exitCode' : 777,
'serviceName' : "serviceName",
'status' : 'IN_PROGRESS',
- 'configurations':{'global' : {}}
+ 'configurations':{'global' : {}},
+ 'roleCommand' : 'START'
}
heartbeat = Heartbeat(actionQueue)
result = heartbeat.build(100)
@@ -125,4 +126,5 @@ class TestHeartbeat(TestCase):
self.assertEquals(result['reports'][0]['exitCode'], 777)
self.assertEquals(result['reports'][0]['serviceName'], "serviceName")
self.assertEquals(result['reports'][0]['status'], "IN_PROGRESS")
+ self.assertEquals(result['reports'][0]['roleCommand'], "START")
pass
Modified:
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandReport.java
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandReport.java?rev=1465681&r1=1465680&r2=1465681&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandReport.java
(original)
+++
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandReport.java
Mon Apr 8 16:53:32 2013
@@ -24,15 +24,16 @@ import org.codehaus.jackson.annotate.Jso
public class CommandReport {
- String role;
- String actionId;
- String stdout;
- String stderr;
- String status;
+ private String role;
+ private String actionId;
+ private String stdout;
+ private String stderr;
+ private String status;
int exitCode;
private String clusterName;
private String serviceName;
private long taskId;
+ private String roleCommand;
private Map<String, Map<String, String>> configurationTags;
@@ -96,6 +97,16 @@ public class CommandReport {
this.stdout = stdout;
}
+ @JsonProperty("roleCommand")
+ public String getRoleCommand() {
+ return this.roleCommand;
+ }
+
+ @JsonProperty("roleCommand")
+ public void setRoleCommand(String roleCommand) {
+ this.roleCommand = roleCommand;
+ }
+
@JsonProperty("role")
public String getRole() {
return role;
@@ -129,7 +140,8 @@ public class CommandReport {
/**
* @param tags the config tags that match this command
*/
- public void setConfigTags(Map<String, Map<String,String>> tags) {
+ @JsonProperty("configurationTags")
+ public void setConfigurationTags(Map<String, Map<String,String>> tags) {
configurationTags = tags;
}
@@ -137,7 +149,8 @@ public class CommandReport {
* @return the config tags that match this command, or <code>null</code>
* if none are present
*/
- public Map<String, Map<String,String>> getConfigTags() {
+ @JsonProperty("configurationTags")
+ public Map<String, Map<String,String>> getConfigurationTags() {
return configurationTags;
}
@@ -151,6 +164,8 @@ public class CommandReport {
", clusterName='" + clusterName + '\'' +
", serviceName='" + serviceName + '\'' +
", taskId=" + taskId +
+ ", roleCommand=" + roleCommand +
+ ", configurationTags=" + configurationTags +
'}';
}
}
Modified:
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java?rev=1465681&r1=1465680&r2=1465681&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
(original)
+++
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
Mon Apr 8 16:53:32 2013
@@ -35,6 +35,8 @@ import org.apache.ambari.server.state.ho
import
org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
import
org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpInProgressEvent;
import
org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpSucceededEvent;
+import
org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartedEvent;
+import
org.apache.ambari.server.state.svccomphost.ServiceComponentHostStoppedEvent;
import org.apache.ambari.server.utils.StageUtils;
import org.apache.ambari.server.utils.VersionUtils;
import org.apache.commons.logging.Log;
@@ -57,8 +59,6 @@ public class HeartBeatHandler {
private final Clusters clusterFsm;
private final ActionQueue actionQueue;
private final ActionManager actionManager;
- private HeartbeatMonitor heartbeatMonitor;
-
@Inject
Injector injector;
@Inject
@@ -69,15 +69,15 @@ public class HeartBeatHandler {
ActionMetadata actionMetadata;
@Inject
HBaseMasterPortScanner scanner;
+ private HeartbeatMonitor heartbeatMonitor;
@Inject
private Gson gson;
-
private Map<String, Long> hostResponseIds = new HashMap<String, Long>();
private Map<String, HeartBeatResponse> hostResponses = new HashMap<String,
HeartBeatResponse>();
@Inject
public HeartBeatHandler(Clusters fsm, ActionQueue aq, ActionManager am,
- Injector injector) {
+ Injector injector) {
this.clusterFsm = fsm;
this.actionQueue = aq;
this.actionManager = am;
@@ -107,7 +107,7 @@ public class HeartBeatHandler {
if (LOG.isDebugEnabled()) {
LOG.debug("Received heartbeat from host"
- + ", hostname=" + hostname
+ + ", hostname=" + hostname
+ ", currentResponseId=" + currentResponseId
+ ", receivedResponseId=" + heartbeat.getResponseId());
}
@@ -115,7 +115,7 @@ public class HeartBeatHandler {
if (heartbeat.getResponseId() == currentResponseId - 1) {
LOG.warn("Old responseId received - response was lost - returning cached
response");
return hostResponses.get(hostname);
- }else if (heartbeat.getResponseId() != currentResponseId) {
+ } else if (heartbeat.getResponseId() != currentResponseId) {
LOG.error("Error in responseId sequence - sending agent restart
command");
return createRestartCommand(currentResponseId);
}
@@ -138,7 +138,7 @@ public class HeartBeatHandler {
HostState hostState = hostObject.getState();
// If the host is waiting for component status updates, notify it
if (heartbeat.componentStatus.size() > 0
- &&
hostObject.getState().equals(HostState.WAITING_FOR_HOST_STATUS_UPDATES)) {
+ &&
hostObject.getState().equals(HostState.WAITING_FOR_HOST_STATUS_UPDATES)) {
try {
LOG.debug("Got component status updates");
hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname,
now));
@@ -155,9 +155,9 @@ public class HeartBeatHandler {
hostObject.handleEvent(new HostUnhealthyHeartbeatEvent(hostname, now,
null));
}
- if(hostState != hostObject.getState())
scanner.updateHBaseMaster(hostObject);
+ if (hostState != hostObject.getState())
scanner.updateHBaseMaster(hostObject);
} catch (InvalidStateTransitionException ex) {
- LOG.warn("Asking agent to reregister due to " + ex.getMessage(), ex);
+ LOG.warn("Asking agent to reregister due to " + ex.getMessage(), ex);
hostObject.setState(HostState.INIT);
return createRegisterCommand();
}
@@ -177,8 +177,8 @@ public class HeartBeatHandler {
protected void processCommandReports(HeartBeat heartbeat,
String hostname,
- Clusters clusterFsm, long now) throws
- AmbariException {
+ Clusters clusterFsm, long now)
+ throws AmbariException {
List<CommandReport> reports = heartbeat.getReports();
for (CommandReport report : reports) {
Cluster cl = clusterFsm.getCluster(report.getClusterName());
@@ -190,35 +190,45 @@ public class HeartBeatHandler {
LOG.info(report.getRole() + " is an action - skip component lookup");
} else {
try {
- if (null != report.getConfigTags() &&
!report.getConfigTags().isEmpty()) {
- cl.updateActualConfigs(hostname, report.getConfigTags());
+ if (null != report.getConfigurationTags() &&
!report.getConfigurationTags().isEmpty()) {
+ cl.updateActualConfigs(hostname, report.getConfigurationTags());
}
-
+
Service svc = cl.getService(service);
ServiceComponent svcComp = svc.getServiceComponent(report.getRole());
ServiceComponentHost scHost =
svcComp.getServiceComponentHost(hostname);
String schName = scHost.getServiceComponentName();
State state = scHost.getState();
-
+
if (report.getStatus().equals("COMPLETED")) {
// Updating stack version, if needed
if (scHost.getState().equals(State.UPGRADING)) {
scHost.setStackVersion(scHost.getDesiredStackVersion());
+ } else if
(report.getRoleCommand().equals(RoleCommand.START.toString())
+ && null != report.getConfigurationTags()) {
+ LOG.info("Updating applied config on service " +
scHost.getServiceName() +
+ ", component " + scHost.getServiceComponentName() + ", host " +
scHost.getHostName());
+ scHost.updateActualConfigs(report.getConfigurationTags());
}
- else if (scHost.getState().equals(State.STARTING) && null !=
report.getConfigTags()) {
- scHost.updateActualConfigs(report.getConfigTags());
+
+ if (RoleCommand.START.toString().equals(report.getRoleCommand())) {
+ scHost.handleEvent(new ServiceComponentHostStartedEvent(schName,
+ hostname, now));
+ } else if
(RoleCommand.STOP.toString().equals(report.getRoleCommand())) {
+ scHost.handleEvent(new ServiceComponentHostStoppedEvent(schName,
+ hostname, now));
+ } else {
+ scHost.handleEvent(new
ServiceComponentHostOpSucceededEvent(schName,
+ hostname, now));
}
-
- scHost.handleEvent(new
ServiceComponentHostOpSucceededEvent(schName,
- hostname, now));
} else if (report.getStatus().equals("FAILED")) {
scHost.handleEvent(new ServiceComponentHostOpFailedEvent(schName,
- hostname, now));
+ hostname, now));
} else if (report.getStatus().equals("IN_PROGRESS")) {
scHost.handleEvent(new
ServiceComponentHostOpInProgressEvent(schName,
- hostname, now));
+ hostname, now));
}
- if(state != scHost.getState() &&
schName.equals(Role.HBASE_MASTER.toString())) {
+ if (state != scHost.getState() &&
schName.equals(Role.HBASE_MASTER.toString())) {
scanner.updateHBaseMaster(cl);
}
} catch (ServiceComponentNotFoundException scnex) {
@@ -234,8 +244,8 @@ public class HeartBeatHandler {
protected void processStatusReports(HeartBeat heartbeat,
String hostname,
- Clusters clusterFsm) throws
- AmbariException {
+ Clusters clusterFsm)
+ throws AmbariException {
Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
for (Cluster cl : clusters) {
for (ComponentStatus status : heartbeat.componentStatus) {
@@ -272,9 +282,10 @@ public class HeartBeatHandler {
if (null != status.getStackVersion() &&
!status.getStackVersion().isEmpty()) {
scHost.setStackVersion(gson.fromJson(status.getStackVersion(),
StackId.class));
}
-
+
if (null != status.getConfigTags()) {
scHost.updateActualConfigs(status.getConfigTags());
+ cl.updateActualConfigs(hostname, status.getConfigTags());
}
} else {
@@ -323,7 +334,7 @@ public class HeartBeatHandler {
* Adds commands from action queue to a heartbeat responce
*/
protected void sendCommands(String hostname, HeartBeatResponse response)
- throws AmbariException
{
+ throws AmbariException {
List<AgentCommand> cmds = actionQueue.dequeueAll(hostname);
if (cmds != null && !cmds.isEmpty()) {
for (AgentCommand ac : cmds) {
@@ -344,8 +355,8 @@ public class HeartBeatHandler {
break;
}
default:
- LOG.error("There is no action for agent command ="+
- ac.getCommandType().name() );
+ LOG.error("There is no action for agent command =" +
+ ac.getCommandType().name());
}
}
}
@@ -381,7 +392,7 @@ public class HeartBeatHandler {
}
public RegistrationResponse handleRegistration(Register register)
- throws InvalidStateTransitionException, AmbariException {
+ throws InvalidStateTransitionException, AmbariException {
String hostname = register.getHostname();
long now = System.currentTimeMillis();
@@ -389,15 +400,15 @@ public class HeartBeatHandler {
String serverVersion = ambariMetaInfo.getServerVersion();
if (!VersionUtils.areVersionsCompatible(serverVersion, agentVersion)) {
LOG.warn("Received registration request from host with non compatible"
- + " agent version"
- + ", hostname=" + hostname
- + ", agentVersion=" + agentVersion
- + ", serverVersion=" + serverVersion);
+ + " agent version"
+ + ", hostname=" + hostname
+ + ", agentVersion=" + agentVersion
+ + ", serverVersion=" + serverVersion);
throw new AmbariException("Cannot register host with non compatible"
- + " agent version"
- + ", hostname=" + hostname
- + ", agentVersion=" + agentVersion
- + ", serverVersion=" + serverVersion);
+ + " agent version"
+ + ", hostname=" + hostname
+ + ", agentVersion=" + agentVersion
+ + ", serverVersion=" + serverVersion);
}
String agentOsType = getOsType(register.getHardwareProfile().getOS(),
Modified:
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEvent.java
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEvent.java?rev=1465681&r1=1465680&r2=1465681&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEvent.java
(original)
+++
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEvent.java
Mon Apr 8 16:53:32 2013
@@ -125,6 +125,10 @@ public abstract class ServiceComponentHo
return new ServiceComponentHostOpRestartedEvent(serviceComponentName,
hostName, opTimestamp);
case HOST_SVCCOMP_OP_SUCCEEDED:
return new ServiceComponentHostOpSucceededEvent(serviceComponentName,
hostName, opTimestamp);
+ case HOST_SVCCOMP_STOPPED:
+ return new ServiceComponentHostStoppedEvent(serviceComponentName,
hostName, opTimestamp);
+ case HOST_SVCCOMP_STARTED:
+ return new ServiceComponentHostStartedEvent(serviceComponentName,
hostName, opTimestamp);
case HOST_SVCCOMP_START:
return new ServiceComponentHostStartEvent(serviceComponentName,
hostName, opTimestamp, configs);
case HOST_SVCCOMP_STOP:
Modified:
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEventType.java
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEventType.java?rev=1465681&r1=1465680&r2=1465681&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEventType.java
(original)
+++
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEventType.java
Mon Apr 8 16:53:32 2013
@@ -48,6 +48,14 @@ public enum ServiceComponentHostEventTyp
*/
HOST_SVCCOMP_STOP,
/**
+ * Start completed.
+ */
+ HOST_SVCCOMP_STARTED,
+ /**
+ * Stop completed.
+ */
+ HOST_SVCCOMP_STOPPED,
+ /**
* Triggering an uninstall.
*/
HOST_SVCCOMP_UNINSTALL,
Modified:
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java?rev=1465681&r1=1465680&r2=1465681&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java
(original)
+++
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java
Mon Apr 8 16:53:32 2013
@@ -239,7 +239,7 @@ public class ServiceComponentImpl implem
+ ", hostname=" + hostName);
}
ServiceComponentHost hostComponent =
- serviceComponentHostFactory.createNew(this, hostName, true);
+ serviceComponentHostFactory.createNew(this, hostName,
this.isClientComponent());
// FIXME need a better approach of caching components by host
ClusterImpl clusterImpl = (ClusterImpl) service.getCluster();
clusterImpl.addServiceComponentHost(hostComponent);
Modified:
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java?rev=1465681&r1=1465680&r2=1465681&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
(original)
+++
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
Mon Apr 8 16:53:32 2013
@@ -167,10 +167,22 @@ public class ServiceComponentHostImpl im
State.STOPPING,
ServiceComponentHostEventType.HOST_SVCCOMP_STOP,
new ServiceComponentHostOpStartedTransition())
- .addTransition(State.INSTALLED,
+ .addTransition(State.INSTALLED,
State.UPGRADING,
ServiceComponentHostEventType.HOST_SVCCOMP_UPGRADE,
new ServiceComponentHostOpStartedTransition())
+ .addTransition(State.INSTALLED,
+ State.INSTALLED,
+ ServiceComponentHostEventType.HOST_SVCCOMP_OP_IN_PROGRESS,
+ new ServiceComponentHostOpInProgressTransition())
+ .addTransition(State.INSTALLED,
+ State.STARTED,
+ ServiceComponentHostEventType.HOST_SVCCOMP_STARTED,
+ new ServiceComponentHostOpCompletedTransition())
+ .addTransition(State.INSTALLED,
+ State.INSTALLED,
+ ServiceComponentHostEventType.HOST_SVCCOMP_STOPPED,
+ new ServiceComponentHostOpCompletedTransition())
.addTransition(State.STARTING,
State.STARTING,
@@ -181,10 +193,9 @@ public class ServiceComponentHostImpl im
State.STARTING,
ServiceComponentHostEventType.HOST_SVCCOMP_START,
new ServiceComponentHostOpStartedTransition())
-
.addTransition(State.STARTING,
State.STARTED,
- ServiceComponentHostEventType.HOST_SVCCOMP_OP_SUCCEEDED,
+ ServiceComponentHostEventType.HOST_SVCCOMP_STARTED,
new ServiceComponentHostOpCompletedTransition())
.addTransition(State.STARTING,
@@ -206,9 +217,21 @@ public class ServiceComponentHostImpl im
new ServiceComponentHostOpStartedTransition())
.addTransition(State.STARTED,
+ State.STARTED,
+ ServiceComponentHostEventType.HOST_SVCCOMP_STARTED,
+ new ServiceComponentHostOpCompletedTransition())
+ .addTransition(State.STARTED,
State.STOPPING,
ServiceComponentHostEventType.HOST_SVCCOMP_STOP,
new ServiceComponentHostOpStartedTransition())
+ .addTransition(State.STARTED,
+ State.STARTED,
+ ServiceComponentHostEventType.HOST_SVCCOMP_OP_IN_PROGRESS,
+ new ServiceComponentHostOpInProgressTransition())
+ .addTransition(State.STARTED,
+ State.INSTALLED,
+ ServiceComponentHostEventType.HOST_SVCCOMP_STOPPED,
+ new ServiceComponentHostOpCompletedTransition())
.addTransition(State.STOPPING,
State.STOPPING,
@@ -216,7 +239,7 @@ public class ServiceComponentHostImpl im
new ServiceComponentHostOpInProgressTransition())
.addTransition(State.STOPPING,
State.INSTALLED,
- ServiceComponentHostEventType.HOST_SVCCOMP_OP_SUCCEEDED,
+ ServiceComponentHostEventType.HOST_SVCCOMP_STOPPED,
new ServiceComponentHostOpCompletedTransition())
.addTransition(State.STOPPING,
State.STOP_FAILED,
@@ -563,6 +586,8 @@ public class ServiceComponentHostImpl im
break;
case HOST_SVCCOMP_OP_FAILED:
case HOST_SVCCOMP_OP_SUCCEEDED:
+ case HOST_SVCCOMP_STOPPED:
+ case HOST_SVCCOMP_STARTED:
setLastOpLastUpdateTime(time);
setLastOpEndTime(time);
break;
Added:
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostStartedEvent.java
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostStartedEvent.java?rev=1465681&view=auto
==============================================================================
---
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostStartedEvent.java
(added)
+++
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostStartedEvent.java
Mon Apr 8 16:53:32 2013
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.state.svccomphost;
+
+import org.apache.ambari.server.state.ServiceComponentHostEvent;
+import org.apache.ambari.server.state.ServiceComponentHostEventType;
+
+public class ServiceComponentHostStartedEvent extends
+ ServiceComponentHostEvent {
+
+ public ServiceComponentHostStartedEvent(String serviceComponentName,
+ String hostName, long opTimestamp) {
+ super(ServiceComponentHostEventType.HOST_SVCCOMP_STARTED,
+ serviceComponentName, hostName, opTimestamp);
+ // TODO Auto-generated constructor stub
+ }
+
+}
Added:
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostStoppedEvent.java
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostStoppedEvent.java?rev=1465681&view=auto
==============================================================================
---
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostStoppedEvent.java
(added)
+++
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostStoppedEvent.java
Mon Apr 8 16:53:32 2013
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.state.svccomphost;
+
+import org.apache.ambari.server.state.ServiceComponentHostEvent;
+import org.apache.ambari.server.state.ServiceComponentHostEventType;
+
+public class ServiceComponentHostStoppedEvent extends
+ ServiceComponentHostEvent {
+
+ public ServiceComponentHostStoppedEvent(String serviceComponentName,
+ String hostName, long opTimestamp) {
+ super(ServiceComponentHostEventType.HOST_SVCCOMP_STOPPED,
+ serviceComponentName, hostName, opTimestamp);
+ // TODO Auto-generated constructor stub
+ }
+
+}
Modified:
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java?rev=1465681&r1=1465680&r2=1465681&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
(original)
+++
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
Mon Apr 8 16:53:32 2013
@@ -206,7 +206,7 @@ public class TestHeartbeatHandler {
cr.setExitCode(215);
cr.setClusterName(DummyCluster);
- cr.setConfigTags(new HashMap<String, Map<String,String>>() {{
+ cr.setConfigurationTags(new HashMap<String, Map<String,String>>() {{
put("global", new HashMap<String,String>() {{ put("tag", "version1");
}});
}});
@@ -375,7 +375,7 @@ public class TestHeartbeatHandler {
cr.setStdOut("");
cr.setExitCode(215);
- cr.setConfigTags(new HashMap<String, Map<String,String>>() {{
+ cr.setConfigurationTags(new HashMap<String, Map<String,String>>() {{
put("global", new HashMap<String,String>() {{ put("tag", "version1");
}});
}});
@@ -394,7 +394,7 @@ public class TestHeartbeatHandler {
}
private void populateActionDB(ActionDBAccessor db, String DummyHostname1) {
- Stage s = new Stage(requestId, "/a/b", DummyCluster, "heartbat handler
test");
+ Stage s = new Stage(requestId, "/a/b", DummyCluster, "heartbeat handler
test");
s.setStageId(stageId);
String filename = null;
s.addHostRoleExecutionCommand(DummyHostname1, Role.HBASE_MASTER,
@@ -691,6 +691,128 @@ public class TestHeartbeatHandler {
assertEquals("Host state should still be installing", State.INSTALLING,
componentState1);
}
+ /**
+ * Tests the fact that when START and STOP commands are in progress, and
heartbeat
+ * forces the host component state to STARTED or INSTALLED, there are no
undesired
+ * side effects.
+ * @throws AmbariException
+ * @throws InvalidStateTransitionException
+ */
+ @Test
+ public void testCommandReportOnHeartbeatUpdatedState()
+ throws AmbariException, InvalidStateTransitionException {
+ ActionManager am = getMockActionManager();
+ Cluster cluster = getDummyCluster();
+
+ @SuppressWarnings("serial")
+ Set<String> hostNames = new HashSet<String>() {{
+ add(DummyHostname1);
+ }};
+ clusters.mapHostsToCluster(hostNames, DummyCluster);
+ Service hdfs = cluster.addService(HDFS);
+ hdfs.persist();
+ hdfs.addServiceComponent(DATANODE).persist();
+
hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+
+ ActionQueue aq = new ActionQueue();
+ HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+
+ ServiceComponentHost serviceComponentHost1 =
clusters.getCluster(DummyCluster).getService(HDFS).
+ getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+ serviceComponentHost1.setState(State.INSTALLED);
+
+ HeartBeat hb = new HeartBeat();
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(0);
+ hb.setHostname(DummyHostname1);
+ hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
+
+ List<CommandReport> reports = new ArrayList<CommandReport>();
+ CommandReport cr = new CommandReport();
+ cr.setActionId(StageUtils.getActionId(requestId, stageId));
+ cr.setTaskId(1);
+ cr.setClusterName(DummyCluster);
+ cr.setServiceName(HDFS);
+ cr.setRole(DATANODE);
+ cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+ cr.setStdErr("none");
+ cr.setStdOut("dummy output");
+ cr.setExitCode(777);
+ cr.setRoleCommand("START");
+ reports.add(cr);
+ hb.setReports(reports);
+ hb.setComponentStatus(new ArrayList<ComponentStatus>());
+
+ handler.handleHeartBeat(hb);
+ assertEquals("Host state should be " + State.INSTALLED,
+ State.INSTALLED, serviceComponentHost1.getState());
+
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(1);
+ cr.setStatus(HostRoleStatus.COMPLETED.toString());
+ cr.setExitCode(0);
+
+ handler.handleHeartBeat(hb);
+ assertEquals("Host state should be " + State.STARTED,
+ State.STARTED, serviceComponentHost1.getState());
+
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(2);
+ cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+ cr.setRoleCommand("STOP");
+ cr.setExitCode(777);
+
+ handler.handleHeartBeat(hb);
+ assertEquals("Host state should be " + State.STARTED,
+ State.STARTED, serviceComponentHost1.getState());
+
+ hb.setTimestamp(System.currentTimeMillis());
+ hb.setResponseId(3);
+ cr.setStatus(HostRoleStatus.COMPLETED.toString());
+ cr.setExitCode(0);
+
+ handler.handleHeartBeat(hb);
+ assertEquals("Host state should be " + State.INSTALLED,
+ State.INSTALLED, serviceComponentHost1.getState());
+
+ // validate the transitions when there is no heartbeat
+ serviceComponentHost1.setState(State.STARTING);
+ cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+ cr.setExitCode(777);
+ cr.setRoleCommand("START");
+ hb.setResponseId(4);
+
+ handler.handleHeartBeat(hb);
+ assertEquals("Host state should be " + State.STARTING,
+ State.STARTING, serviceComponentHost1.getState());
+
+ cr.setStatus(HostRoleStatus.COMPLETED.toString());
+ cr.setExitCode(0);
+ hb.setResponseId(5);
+
+ handler.handleHeartBeat(hb);
+ assertEquals("Host state should be " + State.STARTED,
+ State.STARTED, serviceComponentHost1.getState());
+
+ serviceComponentHost1.setState(State.STOPPING);
+ cr.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+ cr.setExitCode(777);
+ cr.setRoleCommand("STOP");
+ hb.setResponseId(6);
+
+ handler.handleHeartBeat(hb);
+ assertEquals("Host state should be " + State.STOPPING,
+ State.STOPPING, serviceComponentHost1.getState());
+
+ cr.setStatus(HostRoleStatus.COMPLETED.toString());
+ cr.setExitCode(0);
+ hb.setResponseId(7);
+
+ handler.handleHeartBeat(hb);
+ assertEquals("Host state should be " + State.INSTALLED,
+ State.INSTALLED, serviceComponentHost1.getState());
+ }
+
@Test
public void testUpgradeSpecificHandling() throws AmbariException,
InvalidStateTransitionException {
ActionManager am = getMockActionManager();
@@ -900,6 +1022,7 @@ public class TestHeartbeatHandler {
cr1.setStdErr("none");
cr1.setStdOut("dummy output");
cr1.setExitCode(0);
+ cr1.setRoleCommand(RoleCommand.UPGRADE.toString());
CommandReport cr2 = new CommandReport();
cr2.setActionId(StageUtils.getActionId(requestId, stageId));
@@ -911,6 +1034,7 @@ public class TestHeartbeatHandler {
cr2.setStdErr("none");
cr2.setStdOut("dummy output");
cr2.setExitCode(0);
+ cr2.setRoleCommand(RoleCommand.UPGRADE.toString());
ArrayList<CommandReport> reports = new ArrayList<CommandReport>();
reports.add(cr1);
reports.add(cr2);
Modified:
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostTest.java
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostTest.java?rev=1465681&r1=1465680&r2=1465681&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostTest.java
(original)
+++
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostTest.java
Mon Apr 8 16:53:32 2013
@@ -348,9 +348,19 @@ public class ServiceComponentHostTest {
Assert.assertEquals(inProgressState,
impl.getState());
- ServiceComponentHostOpSucceededEvent succeededEvent = new
- ServiceComponentHostOpSucceededEvent(impl.getServiceComponentName(),
- impl.getHostName(), ++timestamp);
+ ServiceComponentHostEvent succeededEvent;
+ if (startEventType == ServiceComponentHostEventType.HOST_SVCCOMP_START) {
+ succeededEvent = new
ServiceComponentHostStartedEvent(impl.getServiceComponentName(),
+ impl.getHostName(), ++timestamp);
+ } else if (startEventType ==
ServiceComponentHostEventType.HOST_SVCCOMP_STOP) {
+ succeededEvent = new
ServiceComponentHostStoppedEvent(impl.getServiceComponentName(),
+ impl.getHostName(), ++timestamp);
+ } else {
+ succeededEvent = new
+ ServiceComponentHostOpSucceededEvent(impl.getServiceComponentName(),
+ impl.getHostName(), ++timestamp);
+ }
+
endTime = timestamp;
impl.handleEvent(succeededEvent);
Assert.assertEquals(startTime, impl.getLastOpStartTime());