Repository: ambari Updated Branches: refs/heads/trunk 6a91834c2 -> a65ae123b
AMBARI-6772. Flume: agent alerts should show a combination of host and agent name (ncole) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a65ae123 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a65ae123 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a65ae123 Branch: refs/heads/trunk Commit: a65ae123b1081d7f7dfd5c121585e9d08992eb33 Parents: 6a91834 Author: Nate Cole <[email protected]> Authored: Thu Aug 7 11:49:44 2014 -0400 Committer: Nate Cole <[email protected]> Committed: Thu Aug 7 17:19:44 2014 -0400 ---------------------------------------------------------------------- .../ambari/server/agent/HeartbeatMonitor.java | 1 + .../ambari/server/agent/StatusCommand.java | 17 ++- .../FLUME/package/scripts/flume_handler.py | 48 ++++--- .../services/FLUME/package/scripts/params.py | 4 + .../server/agent/TestHeartbeatMonitor.java | 2 + .../python/stacks/2.0.6/FLUME/test_flume.py | 125 ++++++++++++++++++- 6 files changed, 179 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/a65ae123/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java index fbab34a..9eab651 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java @@ -311,6 +311,7 @@ public class HeartbeatMonitor implements Runnable { statusCmd.setComponentName(componentName); statusCmd.setConfigurations(configurations); statusCmd.setConfigurationAttributes(configurationAttributes); + statusCmd.setHostname(hostname); // Fill command params Map<String, String> commandParams = statusCmd.getCommandParams(); http://git-wip-us.apache.org/repos/asf/ambari/blob/a65ae123/ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java index a7da22b..9ac8bed 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java @@ -18,12 +18,12 @@ package org.apache.ambari.server.agent; import java.util.HashMap; -import java.util.List; import java.util.Map; -import com.google.gson.annotations.SerializedName; import org.codehaus.jackson.annotate.JsonProperty; +import com.google.gson.annotations.SerializedName; + /** * Command to report the status of a list of services in roles. */ @@ -41,6 +41,8 @@ public class StatusCommand extends AgentCommand { private Map<String, Map<String, Map<String, String>>> configurationAttributes; private Map<String, String> commandParams = new HashMap<String, String>(); private Map<String, String> hostLevelParams = new HashMap<String, String>(); + private String hostname = null; + @JsonProperty("clusterName") public String getClusterName() { @@ -111,5 +113,16 @@ public class StatusCommand extends AgentCommand { public void setCommandParams(Map<String, String> commandParams) { this.commandParams = commandParams; } + + @JsonProperty("hostname") + public void setHostname(String hostname) { + this.hostname = hostname; + } + @JsonProperty("hostname") + public String getHostname() { + return hostname; + } + + } http://git-wip-us.apache.org/repos/asf/ambari/blob/a65ae123/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py index adc7a35..36bc889 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py @@ -63,29 +63,47 @@ class FlumeHandler(Script): json['processes'] = processes json['alerts'] = [] + alert = {} + alert['name'] = 'flume_agent' + alert['label'] = 'Flume Agent process' + if len(processes) == 0 and len(expected_agents) == 0: - alert = {} - alert['name'] = 'flume_agent' - alert['label'] = 'Flume Agent process' alert['state'] = 'WARNING' - alert['text'] = 'No agents defined' - json['alerts'].append(alert) + + if not params.hostname is None: + alert['text'] = 'No agents defined on ' + params.hostname + else: + alert['text'] = 'No agents defined' + else: - for proc in processes: - alert = {} - alert['name'] = 'flume_agent' - alert['instance'] = proc['name'] - alert['label'] = 'Flume Agent process' + crit = [] + ok = [] + for proc in processes: if not proc.has_key('status') or proc['status'] == 'NOT_RUNNING': - alert['state'] = 'CRITICAL' - alert['text'] = 'Flume agent {0} not running'.format(proc['name']) + crit.append(proc['name']) else: - alert['state'] = 'OK' - alert['text'] = 'Flume agent {0} is running'.format(proc['name']) + ok.append(proc['name']) + + text_arr = [] + + if len(crit) > 0: + text_arr.append("{0} {1} NOT running".format(", ".join(crit), + "is" if len(crit) == 1 else "are")) + + if len(ok) > 0: + text_arr.append("{0} {1} running".format(", ".join(ok), + "is" if len(ok) == 1 else "are")) + + plural = len(crit) > 1 or len(ok) > 1 + alert['text'] = "Agent{0} {1} {2}".format( + "s" if plural else "", + " and ".join(text_arr), + "" if params.hostname is None else "on " + str(params.hostname)) - json['alerts'].append(alert) + alert['state'] = 'CRITICAL' if len(crit) > 0 else 'OK' + json['alerts'].append(alert) self.put_structured_out(json) # only throw an exception if there are agents defined and there is a http://git-wip-us.apache.org/repos/asf/ambari/blob/a65ae123/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/params.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/params.py index e26b74d..bcf4d0d 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/params.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/params.py @@ -54,3 +54,7 @@ ganglia_server_hosts = default('/clusterHostInfo/ganglia_server_host', []) ganglia_server_host = None if 0 != len(ganglia_server_hosts): ganglia_server_host = ganglia_server_hosts[0] + +hostname = None +if config.has_key('hostname'): + hostname = config['hostname'] http://git-wip-us.apache.org/repos/asf/ambari/blob/a65ae123/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatMonitor.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatMonitor.java index 8ec1ed8..847a34d 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatMonitor.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatMonitor.java @@ -295,6 +295,8 @@ public class TestHeartbeatMonitor { assertTrue("HeartbeatMonitor should generate StatusCommands for host2, " + "even if it has only client components", cmds.size() == 1); assertTrue(cmds.get(0).getComponentName().equals(Role.HDFS_CLIENT.name())); + assertEquals(hostname2, cmds.get(0).getHostname()); + } @Test http://git-wip-us.apache.org/repos/asf/ambari/blob/a65ae123/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py index 78f4246..c1f48d0 100644 --- a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py +++ b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py @@ -98,7 +98,7 @@ class TestFlumeHandler(RMFTestCase): # test that the method was called with empty processes self.assertTrue(structured_out_mock.called) structured_out_mock.assert_called_with({'processes': [], - 'alerts': [{'text': 'No agents defined', 'state': 'WARNING', 'name': 'flume_agent', 'label': 'Flume Agent process'}]}) + 'alerts': [{'text': 'No agents defined on c6401.ambari.apache.org', 'state': 'WARNING', 'name': 'flume_agent', 'label': 'Flume Agent process'}]}) self.assertNoMoreResources() @patch("resource_management.libraries.script.Script.put_structured_out") @@ -296,6 +296,129 @@ class TestFlumeHandler(RMFTestCase): self.assert_configure_default() self.assertNoMoreResources() + @patch("resource_management.libraries.script.Script.put_structured_out") + @patch("flume.find_expected_agent_names") + @patch("flume.flume_status") + def test_status_many_mixed(self, status_mock, expected_names_mock, structured_out_mock): + expected_names_mock.return_value = ["a1", "a2"] + status_mock.return_value = [{'name': 'a1', 'status': 'RUNNING'}, {'name': 'a2', 'status': 'NOT_RUNNING'}] + + try: + self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py", + classname = "FlumeHandler", + command = "status", + config_file="default.json") + except: + # expected since ComponentIsNotRunning gets raised + pass + + self.assertTrue(structured_out_mock.called) + + # call_args[0] is a tuple, whose first element is the actual call argument + struct_out = structured_out_mock.call_args[0][0] + self.assertTrue(struct_out.has_key('processes')) + self.assertTrue(struct_out.has_key('alerts')) + self.assertTrue('Agent a2 is NOT running and a1 is running on c6401.ambari.apache.org' == struct_out['alerts'][0]['text']) + self.assertTrue('CRITICAL' == struct_out['alerts'][0]['state']) + self.assertNoMoreResources() + + @patch("resource_management.libraries.script.Script.put_structured_out") + @patch("flume.find_expected_agent_names") + @patch("flume.flume_status") + def test_status_many_ok(self, status_mock, expected_names_mock, structured_out_mock): + expected_names_mock.return_value = ["a1", "a2"] + status_mock.return_value = [{'name': 'a1', 'status': 'RUNNING'}, {'name': 'a2', 'status': 'RUNNING'}] + + self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py", + classname = "FlumeHandler", + command = "status", + config_file="default.json") + + self.assertTrue(structured_out_mock.called) + + # call_args[0] is a tuple, whose first element is the actual call argument + struct_out = structured_out_mock.call_args[0][0] + self.assertTrue(struct_out.has_key('processes')) + self.assertTrue(struct_out.has_key('alerts')) + self.assertTrue('Agents a1, a2 are running on c6401.ambari.apache.org' == struct_out['alerts'][0]['text']) + self.assertTrue('OK' == struct_out['alerts'][0]['state']) + self.assertNoMoreResources() + + @patch("resource_management.libraries.script.Script.put_structured_out") + @patch("flume.find_expected_agent_names") + @patch("flume.flume_status") + def test_status_many_critical(self, status_mock, expected_names_mock, structured_out_mock): + expected_names_mock.return_value = ["a1", "a2"] + status_mock.return_value = [{'name': 'a1', 'status': 'NOT_RUNNING'}, {'name': 'a2', 'status': 'NOT_RUNNING'}] + + try: + self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py", + classname = "FlumeHandler", + command = "status", + config_file="default.json") + except: + # expected since ComponentIsNotRunning gets raised + pass + + self.assertTrue(structured_out_mock.called) + + # call_args[0] is a tuple, whose first element is the actual call argument + struct_out = structured_out_mock.call_args[0][0] + self.assertTrue(struct_out.has_key('processes')) + self.assertTrue(struct_out.has_key('alerts')) + self.assertTrue('Agents a1, a2 are NOT running on c6401.ambari.apache.org' == struct_out['alerts'][0]['text']) + self.assertTrue('CRITICAL' == struct_out['alerts'][0]['state']) + self.assertNoMoreResources() + + + @patch("resource_management.libraries.script.Script.put_structured_out") + @patch("flume.find_expected_agent_names") + @patch("flume.flume_status") + def test_status_single_ok(self, status_mock, expected_names_mock, structured_out_mock): + expected_names_mock.return_value = ["a1"] + status_mock.return_value = [{'name': 'a1', 'status': 'RUNNING'}] + + self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py", + classname = "FlumeHandler", + command = "status", + config_file="default.json") + + self.assertTrue(structured_out_mock.called) + + # call_args[0] is a tuple, whose first element is the actual call argument + struct_out = structured_out_mock.call_args[0][0] + self.assertTrue(struct_out.has_key('processes')) + self.assertTrue(struct_out.has_key('alerts')) + self.assertTrue('Agent a1 is running on c6401.ambari.apache.org' == struct_out['alerts'][0]['text']) + self.assertTrue('OK' == struct_out['alerts'][0]['state']) + self.assertNoMoreResources() + + @patch("resource_management.libraries.script.Script.put_structured_out") + @patch("flume.find_expected_agent_names") + @patch("flume.flume_status") + def test_status_single_critical(self, status_mock, expected_names_mock, structured_out_mock): + expected_names_mock.return_value = ['a1'] + status_mock.return_value = [{'name': 'a1', 'status': 'NOT_RUNNING'}] + + try: + self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py", + classname = "FlumeHandler", + command = "status", + config_file="default.json") + except: + # expected since ComponentIsNotRunning gets raised + pass + + self.assertTrue(structured_out_mock.called) + + # call_args[0] is a tuple, whose first element is the actual call argument + struct_out = structured_out_mock.call_args[0][0] + self.assertTrue(struct_out.has_key('processes')) + self.assertTrue(struct_out.has_key('alerts')) + self.assertTrue('Agent a1 is NOT running on c6401.ambari.apache.org' == struct_out['alerts'][0]['text']) + self.assertTrue('CRITICAL' == struct_out['alerts'][0]['state']) + self.assertNoMoreResources() + def build_flume(content): result = {} agent_names = []
