Author: mahadev
Date: Mon Jan 21 18:20:17 2013
New Revision: 1436531
URL: http://svn.apache.org/viewvc?rev=1436531&view=rev
Log:
AMBARI-1201. Improve Agent Registration and Heartbeat json. (Nate Cole via
mahadev)
Modified:
incubator/ambari/trunk/CHANGES.txt
incubator/ambari/trunk/ambari-agent/conf/unix/ambari-agent.ini
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/Controller.py
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/HostInfo.py
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/Register.py
incubator/ambari/trunk/ambari-agent/src/test/python/TestHeartbeat.py
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentEnv.java
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java
Modified: incubator/ambari/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1436531&r1=1436530&r2=1436531&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Mon Jan 21 18:20:17 2013
@@ -130,6 +130,9 @@ Trunk (unreleased changes):
AMBARI-1231. Replace sudo with su in the ambari setup script since ambari
server setup is already run as root. (mahadev)
+ AMBARI-1201. Improve Agent Registration and Heartbeat json. (Nate Cole via
+ mahadev)
+
AMBARI-1.2.0 branch:
INCOMPATIBLE CHANGES
Modified: incubator/ambari/trunk/ambari-agent/conf/unix/ambari-agent.ini
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/conf/unix/ambari-agent.ini?rev=1436531&r1=1436530&r2=1436531&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/conf/unix/ambari-agent.ini (original)
+++ incubator/ambari/trunk/ambari-agent/conf/unix/ambari-agent.ini Mon Jan 21
18:20:17 2013
@@ -40,3 +40,13 @@ passphrase_env_var_name=AMBARI_PASSPHRAS
[services]
pidLookupPath=/var/run/
+
+[heartbeat]
+state_interval=6
+dirs=/etc/hadoop,/etc/hadoop/conf,/etc/hbase,/etc/hcatalog,/etc/hive,/etc/oozie,
+ /etc/sqoop,/etc/ganglia,/etc/nagios,
+
/var/run/hadoop,/var/run/zookeeper,/var/run/hbase,/var/run/templeton,/var/run/oozie,
+
/var/log/hadoop,/var/log/zookeeper,/var/log/hbase,/var/run/templeton,/var/log/hive,
+ /var/log/nagios
+rpms=yum,rpm,openssl,curl,wget,net-snmp,ntpd,ruby,puppet,nagios,ganglia,passenger,
+
hadoop,hbase,oozie,sqoop,pig,zookeeper,hive,libconfuse,postgresql,httpd,apache2,http-server
Modified:
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py?rev=1436531&r1=1436530&r2=1436531&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
(original)
+++
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
Mon Jan 21 18:20:17 2013
@@ -53,6 +53,11 @@ sleepBetweenRetries=1
keysdir=/tmp/ambari-agent
server_crt=ca.crt
passphrase_env_var_name=AMBARI_PASSPHRASE
+
+[heartbeat]
+state_interval = 6
+dirs=/etc/hadoop,/etc/hadoop/conf,/var/run/hadoop,/var/log/hadoop
+rpms=hadoop,openssl,wget,net-snmp,ntpd,ruby,ganglia,nagios
"""
s = StringIO.StringIO(content)
config.readfp(s)
Modified:
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/Controller.py
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/Controller.py?rev=1436531&r1=1436530&r2=1436531&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/Controller.py
(original)
+++
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/Controller.py
Mon Jan 21 18:20:17 2013
@@ -135,12 +135,15 @@ class Controller(threading.Thread):
retry = False
certVerifFailed = False
+ config = AmbariConfig.config
+ hb_interval = config.get('heartbeat', 'state_interval')
+
#TODO make sure the response id is monotonically increasing
id = 0
while not self.DEBUG_STOP_HEARTBITTING:
try:
if not retry:
- data = json.dumps(self.heartbeat.build(self.responseId))
+ data = json.dumps(self.heartbeat.build(self.responseId,
int(hb_interval)))
pass
else:
self.DEBUG_HEARTBEAT_RETRIES += 1
Modified:
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/Heartbeat.py?rev=1436531&r1=1436530&r2=1436531&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
(original)
+++
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
Mon Jan 21 18:20:17 2013
@@ -54,7 +54,7 @@ class Heartbeat:
'hostname' : socket.getfqdn(),
'nodeStatus' : nodeStatus
}
- if (int(id) >= 0) and (int(id) % state_interval) == 0:
+ if (int(id) >= 0) and state_interval > 0 and (int(id) % state_interval) ==
0:
hostInfo = HostInfo()
nodeInfo = { }
# for now, just do the same work as registration
Modified:
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/HostInfo.py
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/HostInfo.py?rev=1436531&r1=1436530&r2=1436531&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/HostInfo.py
(original)
+++
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/HostInfo.py
Mon Jan 21 18:20:17 2013
@@ -21,6 +21,8 @@ limitations under the License.
import os
import glob
import pwd
+import subprocess
+import AmbariConfig
class HostInfo:
@@ -31,10 +33,33 @@ class HostInfo:
return 'sym_link'
elif os.path.isdir(path):
return 'directory'
+ elif os.path.isfile(path):
+ return 'file'
return 'unknown'
- def hadoopDir(self):
- return self.dirType('/etc/hadoop')
+ def rpmInfo(self, rpmList):
+ config = AmbariConfig.config
+
+ try:
+ for rpmName in config.get('heartbeat', 'rpms').split(','):
+ rpmName = rpmName.strip()
+ rpm = { }
+ rpm['name'] = rpmName
+
+ try:
+ osStat = subprocess.Popen(["rpm", "-q", rpmName],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ out, err = osStat.communicate()
+ if (0 != osStat.returncode or 0 == len(out.strip())):
+ rpm['installed'] = False
+ else:
+ rpm['installed'] = True
+ rpm['version'] = out.strip()
+ except:
+ rpm['available'] = False
+
+ rpmList.append(rpm)
+ except:
+ pass
def hadoopVarRunCount(self):
if not os.path.exists('/var/run/hadoop'):
@@ -47,60 +72,88 @@ class HostInfo:
return 0
logs = glob.glob('/var/log/hadoop/*/*.log')
return len(logs)
+
+ def etcAlternativesConf(self, etcList):
+ if not os.path.exists('/etc/alternatives'):
+ return []
+ confs = glob.glob('/etc/alternatives/*conf')
+
+ for conf in confs:
+ confinfo = { }
+ realconf = conf
+ if os.path.islink(conf):
+ realconf = os.path.realpath(conf)
+ confinfo['name'] = conf
+ confinfo['target'] = realconf
+ etcList.append(confinfo)
- def dirHelper(self, dict, name, prefix):
- dict[name] = self.dirType(os.path.join(prefix, name))
+ def repos(self):
+ # centos, redhat
+ try:
+ osStat = subprocess.Popen(["yum", "-C", "repolist"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ out, err = osStat.communicate()
+ return out
+ except:
+ pass
+ # suse, only if above failed
+ try:
+ osStat = subprocess.Popen(["zypper", "repos"], stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
+ out, err = osStat.communicate()
+ return out
+ except:
+ pass
+
+ # default, never return empty
+ return "could_not_determine"
+
def register(self, dict):
dict['varLogHadoopLogCount'] = self.hadoopVarLogCount()
dict['varRunHadoopPidCount'] = self.hadoopVarRunCount()
-
- etcDirs = { }
- self.dirHelper(etcDirs, 'hadoop', '/etc')
- etcDirs['hadoop_conf'] = self.dirType('/etc/hadoop/conf')
- self.dirHelper(etcDirs, 'hbase', '/etc')
- self.dirHelper(etcDirs, 'hcatalog', '/etc')
- self.dirHelper(etcDirs, 'hive', '/etc')
- self.dirHelper(etcDirs, 'oozie', '/etc')
- self.dirHelper(etcDirs, 'sqoop', '/etc')
- self.dirHelper(etcDirs, 'ganglia', '/etc')
- self.dirHelper(etcDirs, 'nagios', '/etc')
- dict['etcDirs'] = etcDirs
- varRunDirs = { }
- self.dirHelper(varRunDirs, 'hadoop', '/var/run')
- self.dirHelper(varRunDirs, 'zookeeper', '/var/run')
- self.dirHelper(varRunDirs, 'hbase', '/var/run')
- self.dirHelper(varRunDirs, 'templeton', '/var/run')
- self.dirHelper(varRunDirs, 'oozie', '/var/run')
- dict['varRunDirs'] = varRunDirs
-
- varLogDirs = { }
- self.dirHelper(varLogDirs, 'hadoop', '/var/log')
- self.dirHelper(varLogDirs, 'zookeeper', '/var/log')
- self.dirHelper(varLogDirs, 'hbase', '/var/log')
- self.dirHelper(varLogDirs, 'hive', '/var/log')
- self.dirHelper(varLogDirs, 'templeton', '/var/log')
- self.dirHelper(varLogDirs, 'nagios', '/var/log')
- dict['varLogDirs'] = varLogDirs
+ etcs = []
+ self.etcAlternativesConf(etcs)
+ dict['etcAlternativesConf'] = etcs
+
+ dirs = []
+ config = AmbariConfig.config
+ try:
+ for dirName in config.get('heartbeat', 'dirs').split(','):
+ obj = { }
+ obj['type'] = self.dirType(dirName.strip())
+ obj['name'] = dirName.strip()
+ dirs.append(obj)
+ except:
+ pass
+
+ dict['paths'] = dirs
java = []
- self.hadoopJava(java)
- dict['hadoopJavaProcs'] = java
+ self.javaProcs(java)
+ dict['javaProcs'] = java
+
+ rpms = []
+ self.rpmInfo(rpms)
+ dict['rpms'] = rpms
+
+ dict['repoInfo'] = self.repos()
- def hadoopJava(self, list):
+ def javaProcs(self, list):
try:
pids = [pid for pid in os.listdir('/proc') if pid.isdigit()]
for pid in pids:
cmd = open(os.path.join('/proc', pid, 'cmdline'), 'rb').read()
- if 'java' in cmd and 'hadoop' in cmd:
+ cmd = cmd.replace('\0', ' ')
+ if 'java' in cmd:
+ dict = { }
+ dict['pid'] = int(pid)
+ dict['hadoop'] = True if 'hadoop' in cmd else False
+ dict['command'] = cmd.strip()
for line in open(os.path.join('/proc', pid, 'status')):
if line.startswith('Uid:'):
uid = int(line.split()[1])
- dict = { }
dict['user'] = pwd.getpwuid(uid).pw_name
- dict['pid'] = int(pid)
- list.append(dict)
+ list.append(dict)
except:
pass
pass
Modified:
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/Register.py
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/Register.py?rev=1436531&r1=1436530&r2=1436531&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/Register.py
(original)
+++
incubator/ambari/trunk/ambari-agent/src/main/python/ambari_agent/Register.py
Mon Jan 21 18:20:17 2013
@@ -61,7 +61,6 @@ class Register:
'hardwareProfile' : self.hardware.get(),
'agentEnv' : agentEnv
}
- print str(time.time())
return register
def doExec(vals, key, command, preLF=False):
Modified: incubator/ambari/trunk/ambari-agent/src/test/python/TestHeartbeat.py
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-agent/src/test/python/TestHeartbeat.py?rev=1436531&r1=1436530&r2=1436531&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-agent/src/test/python/TestHeartbeat.py
(original)
+++ incubator/ambari/trunk/ambari-agent/src/test/python/TestHeartbeat.py Mon
Jan 21 18:20:17 2013
@@ -43,7 +43,8 @@ class TestHeartbeat(TestCase):
self.assertEquals(len(result['nodeStatus']), 2)
self.assertEquals(result['nodeStatus']['cause'], "NONE")
self.assertEquals(result['nodeStatus']['status'], "HEALTHY")
- self.assertEquals(len(result), 7)
+ # result may or may NOT have an agentEnv structure in it
+ self.assertEquals((len(result) is 6) or (len(result) is 7), True)
self.assertEquals(not heartbeat.reports, True, "Heartbeat should not
contain task in progress")
Modified:
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentEnv.java
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentEnv.java?rev=1436531&r1=1436530&r2=1436531&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentEnv.java
(original)
+++
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentEnv.java
Mon Jan 21 18:20:17 2013
@@ -17,61 +17,57 @@
*/
package org.apache.ambari.server.agent;
-import java.util.Map;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
/**
* Agent environment data.
*/
public class AgentEnv {
+
/**
- * Various directories in /etc
- */
- private Map<String, String> etcDirs = null;
- /**
- * Various directories in /var/run
+ * Various directories, configurable in <code>ambari-agent.ini</code>
*/
- private Map<String, String> varRunDirs = null;
+ private Directory[] paths = new Directory[0];
+
/**
- * Various directories in /var/log
+ * Java processes running on the system. Default empty array.
*/
- private Map<String, String> varLogDirs = null;
-
+ private JavaProc[] javaProcs = new JavaProc[0];
+
/**
- * Java processes with the word "hadoop" in them, users and pids
+ * Various RPM package versions.
*/
- private JavaProc[] hadoopJavaProcs = null;
+ private Rpm[] rpms = new Rpm[0];
+
/**
- * Number of pid files found in /var/run/hadoop
+ * Number of pid files found in <code>/var/run/hadoop</code>
*/
private int varRunHadoopPidCount = 0;
+
/**
- * Number of log files found in /var/log/hadoop
+ * Number of log files found in <code>/var/log/hadoop</code>
*/
private int varLogHadoopLogCount = 0;
+ /**
+ * Directories that match name <code>/etc/alternatives/*conf</code>
+ */
+ private Alternative[] etcAlternativesConf = new Alternative[0];
+
+ /**
+ * Output for repo listing. Command to do this varies, but for RHEL it is
+ * <code>yum -C repolist</code>
+ */
+ private String repoInfo;
- public Map<String, String> getEtcDirs() {
- return etcDirs;
- }
-
- public void setEtcDirs(Map<String, String> dirs) {
- etcDirs = dirs;
- }
-
- public Map<String, String> getVarRunDirs() {
- return varRunDirs;
- }
-
- public void setVarRunDirs(Map<String, String> dirs) {
- varRunDirs = dirs;
- }
-
- public Map<String, String> getVarLogDirs() {
- return varLogDirs;
+
+ public Directory[] getPaths() {
+ return paths;
}
- public void setVarLogDirs(Map<String, String> dirs) {
- varLogDirs = dirs;
+ public void setPaths(Directory[] dirs) {
+ paths = dirs;
}
public void setVarRunHadoopPidCount(int count) {
@@ -90,17 +86,104 @@ public class AgentEnv {
return varLogHadoopLogCount;
}
- public void setHadoopJavaProcs(JavaProc[] procs) {
- hadoopJavaProcs = procs;
+ public void setJavaProcs(JavaProc[] procs) {
+ javaProcs = procs;
+ }
+
+ public JavaProc[] getJavaProcs() {
+ return javaProcs;
+ }
+
+ public void setRpms(Rpm[] rpm) {
+ rpms = rpm;
+ }
+
+ public Rpm[] getRpms() {
+ return rpms;
+ }
+
+ public void setEtcAlternativesConf(Alternative[] dirs) {
+ etcAlternativesConf = dirs;
+ }
+
+ public Alternative[] getEtcAlternativesConf() {
+ return etcAlternativesConf;
+ }
+
+ public void setRepoInfo(String info) {
+ repoInfo = info;
+ }
+
+ public String getRepoInfo() {
+ return repoInfo;
+ }
+
+ /**
+ * Represents information about rpm-installed packages
+ */
+ public static class Rpm {
+ private String rpmName;
+ private boolean rpmInstalled = false;
+ private String rpmVersion;
+
+ public void setName(String name) {
+ rpmName = name;
+ }
+
+ public String getName() {
+ return rpmName;
+ }
+
+ public void setInstalled(boolean installed) {
+ rpmInstalled = installed;
+ }
+
+ public boolean isInstalled() {
+ return rpmInstalled;
+ }
+
+ public void setVersion(String version) {
+ rpmVersion = version;
+ }
+
+ @JsonSerialize(include=Inclusion.NON_NULL)
+ public String getVersion() {
+ return rpmVersion;
+ }
}
- public JavaProc[] getHadoopJavaProcs() {
- return hadoopJavaProcs;
+ /**
+ * Represents information about a directory of interest.
+ */
+ public static class Directory {
+ private String dirName;
+ private String dirType;
+
+ public void setName(String name) {
+ dirName = name;
+ }
+
+ public String getName() {
+ return dirName;
+ }
+
+ public void setType(String type) {
+ dirType = type;
+ }
+
+ public String getType() {
+ return dirType;
+ }
}
+ /**
+ * Represents information about running java processes.
+ */
public static class JavaProc {
private String user;
- private int pid;
+ private int pid = 0;
+ private boolean is_hadoop = false;
+ private String command;
public void setUser(String user) {
this.user = user;
@@ -117,6 +200,43 @@ public class AgentEnv {
public int getPid() {
return pid;
}
+
+ public void setHadoop(boolean hadoop) {
+ is_hadoop = hadoop;
+ }
+
+ public boolean isHadoop() {
+ return is_hadoop;
+ }
+
+ public void setCommand(String cmd) {
+ command = cmd;
+ }
+
+ public String getCommand() {
+ return command;
+ }
+ }
+
+ public static class Alternative {
+ private String altName;
+ private String altTarget;
+
+ public void setName(String name) {
+ altName = name;
+ }
+
+ public String getName() {
+ return altName;
+ }
+
+ public void setTarget(String target) {
+ altTarget = target;
+ }
+
+ public String getTarget() {
+ return altTarget;
+ }
}
}
Modified:
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java?rev=1436531&r1=1436530&r2=1436531&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java
(original)
+++
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java
Mon Jan 21 18:20:17 2013
@@ -29,6 +29,7 @@ import junit.framework.Assert;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.agent.AgentEnv;
+import org.apache.ambari.server.agent.AgentEnv.Directory;
import org.apache.ambari.server.agent.DiskInfo;
import org.apache.ambari.server.agent.HostInfo;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
@@ -143,17 +144,15 @@ public class ClusterTest {
hostInfo.setMounts(mounts);
AgentEnv agentEnv = new AgentEnv();
- Map<String, String> etcDirs = new HashMap<String, String>();
- etcDirs.put("hadoop", "not_exist");
- agentEnv.setEtcDirs(etcDirs);
- Map<String, String> varRunDirs = new HashMap<String, String>();
- varRunDirs.put("hadoop", "not_exist");
- agentEnv.setVarRunDirs(varRunDirs);
+ Directory dir1 = new Directory();
+ dir1.setName("/etc/hadoop");
+ dir1.setType("not_exist");
+ Directory dir2 = new Directory();
+ dir2.setName("/var/log/hadoop");
+ dir2.setType("not_exist");
+ agentEnv.setPaths(new Directory[] { dir1, dir2 });
- Map<String, String> varLogDirs = new HashMap<String, String>();
- varLogDirs.put("hadoop", "not_exist");
- agentEnv.setVarLogDirs(varLogDirs);
AgentVersion agentVersion = new AgentVersion("0.0.x");
long currentTime = 1001;