Repository: ambari Updated Branches: refs/heads/trunk f0c31d29a -> bc5d82cad
AMBARI-5574 Need some logging when JSON parsing throws an error (dsen) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/bc5d82ca Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/bc5d82ca Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/bc5d82ca Branch: refs/heads/trunk Commit: bc5d82cad01f484195ddb30bdef60aa684856da4 Parents: f0c31d2 Author: Dmitry Sen <[email protected]> Authored: Fri Apr 25 20:16:03 2014 +0300 Committer: Dmitry Sen <[email protected]> Committed: Fri Apr 25 20:36:20 2014 +0300 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/Controller.py | 29 ++++++---- .../src/main/python/ambari_agent/security.py | 8 ++- .../test/python/ambari_agent/TestController.py | 58 +++++++++++++------- .../test/python/ambari_agent/TestSecurity.py | 39 ++++++++++++- 4 files changed, 100 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/bc5d82ca/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index f1099ae..9839313 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -88,8 +88,7 @@ class Controller(threading.Thread): try: data = json.dumps(self.register.build(id)) logger.info("Registering with the server " + pprint.pformat(data)) - response = self.sendRequest(self.registerUrl, data) - ret = json.loads(response) + ret = self.sendRequest(self.registerUrl, data) exitstatus = 0 # exitstatus is a code of error which was rised on server side. # exitstatus = 0 (OK - Default) @@ -177,7 +176,6 @@ class Controller(threading.Thread): else: self.DEBUG_HEARTBEAT_RETRIES += 1 response = self.sendRequest(self.heartbeatUrl, data) - response = json.loads(response) logger.debug('Got server response: ' + pprint.pformat(response)) @@ -283,17 +281,28 @@ class Controller(threading.Thread): pass def sendRequest(self, url, data): - if self.cachedconnect is None: # Lazy initialization - self.cachedconnect = security.CachedHTTPSConnection(self.config) - req = urllib2.Request(url, data, {'Content-Type': 'application/json'}) - response = self.cachedconnect.request(req) - return response + try: + if self.cachedconnect is None: # Lazy initialization + self.cachedconnect = security.CachedHTTPSConnection(self.config) + req = urllib2.Request(url, data, {'Content-Type': 'application/json'}) + response = None + response = self.cachedconnect.request(req) + return json.loads(response) + except Exception: + if response is None: + err_msg = 'Request failed! Data: ' + str(data) + logger.warn(err_msg) + return {'exitstatus': 1, 'log': err_msg} + else: + err_msg = ('Response parsing failed! Request data: ' + str(data) + + '; Response: ' + str(response)) + logger.warn(err_msg) + return {'exitstatus': 1, 'log': err_msg} def updateComponents(self, cluster_name): logger.info("Updating components map of cluster " + cluster_name) response = self.sendRequest(self.componentsUrl + cluster_name, None) - logger.debug("Response from server = " + response) - response = json.loads(response) + logger.debug("Response from server = " + str(response)) for service, components in response['components'].items(): LiveStatus.SERVICES.append(service) for component, category in components.items(): http://git-wip-us.apache.org/repos/asf/ambari/blob/bc5d82ca/ambari-agent/src/main/python/ambari_agent/security.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/security.py b/ambari-agent/src/main/python/ambari_agent/security.py index 3052245..9801dec 100644 --- a/ambari-agent/src/main/python/ambari_agent/security.py +++ b/ambari-agent/src/main/python/ambari_agent/security.py @@ -208,8 +208,12 @@ class CertificateManager(): f = urllib2.urlopen(req) response = f.read() f.close() - data = json.loads(response) - logger.debug("Sign response from Server: \n" + pprint.pformat(data)) + try: + data = json.loads(response) + logger.debug("Sign response from Server: \n" + pprint.pformat(data)) + except Exception: + logger.warn("Malformed response! data: " + str(data)) + data = {'result': 'ERROR'} result=data['result'] if result == 'OK': agentCrtContent=data['signedCa'] http://git-wip-us.apache.org/repos/asf/ambari/blob/bc5d82ca/ambari-agent/src/test/python/ambari_agent/TestController.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py index 000cb3f..daa7b82 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestController.py +++ b/ambari-agent/src/test/python/ambari_agent/TestController.py @@ -80,18 +80,18 @@ class TestController(unittest.TestCase): self.controller.sendRequest = MagicMock() - dumpsMock.return_value = "request" - self.controller.sendRequest.return_value = '{"log":"Error text", "exitstatus":"1"}' + dumpsMock.return_value = '{"valid_object": true}' + self.controller.sendRequest.return_value = {"log":"Error text", "exitstatus":"1"} self.assertEqual({u'exitstatus': u'1', u'log': u'Error text'}, self.controller.registerWithServer()) self.assertEqual(LiveStatus_mock.SERVICES, []) self.assertEqual(LiveStatus_mock.CLIENT_COMPONENTS, []) self.assertEqual(LiveStatus_mock.COMPONENTS, []) - self.controller.sendRequest.return_value = '{"responseId":1}' + self.controller.sendRequest.return_value = {"responseId":1} self.assertEqual({"responseId":1}, self.controller.registerWithServer()) - self.controller.sendRequest.return_value = '{"responseId":1, "statusCommands": "commands", "log":"", "exitstatus":"0"}' + self.controller.sendRequest.return_value = {"responseId":1, "statusCommands": "commands", "log":"", "exitstatus":"0"} self.controller.addToStatusQueue = MagicMock(name="addToStatusQueue") self.controller.isRegistered = False self.assertEqual({'exitstatus': '0', 'responseId': 1, 'log': '', 'statusCommands': 'commands'}, self.controller.registerWithServer()) @@ -105,7 +105,7 @@ class TestController(unittest.TestCase): raise Exception("test") return "request" - self.controller.sendRequest.return_value = '{"responseId":1}' + self.controller.sendRequest.return_value = {"responseId":1} dumpsMock.side_effect = side_effect self.controller.isRegistered = False @@ -306,7 +306,6 @@ class TestController(unittest.TestCase): def test_sendRequest(self, security_mock, requestMock): conMock = MagicMock() - conMock.request.return_value = "response" security_mock.CachedHTTPSConnection.return_value = conMock url = "url" data = "data" @@ -314,19 +313,34 @@ class TestController(unittest.TestCase): self.controller.cachedconnect = None - self.assertEqual("response", self.controller.sendRequest(url, data)) + conMock.request.return_value = '{"valid_object": true}' + actual = self.controller.sendRequest(url, data) + expected = json.loads('{"valid_object": true}') + self.assertEqual(actual, expected) + security_mock.CachedHTTPSConnection.assert_called_once_with( self.controller.config) requestMock.called_once_with(url, data, {'Content-Type': 'application/json'}) + conMock.request.return_value = '{invalid_object}' + actual = self.controller.sendRequest(url, data) + expected = {'exitstatus': 1, 'log': ('Response parsing failed! Request data: ' + data + + '; Response: {invalid_object}')} + self.assertEqual(actual, expected) + + conMock.request.side_effect = Exception() + actual = self.controller.sendRequest(url, data) + expected = {'exitstatus': 1, 'log': 'Request failed! Data: ' + data} + + self.assertEqual(actual, expected) + + @patch.object(threading._Event, "wait") @patch("time.sleep") - @patch("json.loads") @patch("json.dumps") - def test_heartbeatWithServer(self, dumpsMock, loadsMock, sleepMock, event_mock): - + def test_heartbeatWithServer(self, dumpsMock, sleepMock, event_mock): out = StringIO.StringIO() sys.stdout = out @@ -340,11 +354,11 @@ class TestController(unittest.TestCase): self.controller.responseId = 1 response = {"responseId":"2", "restartAgent":"false"} - loadsMock.return_value = response + sendRequest.return_value = response def one_heartbeat(*args, **kwargs): self.controller.DEBUG_STOP_HEARTBEATING = True - return "data" + return response sendRequest.side_effect = one_heartbeat @@ -364,7 +378,7 @@ class TestController(unittest.TestCase): raise Exception() if len(calls) > 0: self.controller.DEBUG_STOP_HEARTBEATING = True - return "data" + return response # exception, retry, successful and stop sendRequest.side_effect = retry @@ -374,6 +388,7 @@ class TestController(unittest.TestCase): self.assertEqual(1, self.controller.DEBUG_SUCCESSFULL_HEARTBEATS) # retry registration + self.controller.responseId = 2 response["registrationCommand"] = "true" sendRequest.side_effect = one_heartbeat self.controller.DEBUG_STOP_HEARTBEATING = False @@ -382,6 +397,7 @@ class TestController(unittest.TestCase): self.assertTrue(self.controller.repeatRegistration) # components are not mapped + self.controller.responseId = 2 response["registrationCommand"] = "false" response["hasMappedComponents"] = False sendRequest.side_effect = one_heartbeat @@ -391,6 +407,7 @@ class TestController(unittest.TestCase): self.assertFalse(self.controller.hasMappedComponents) # components are mapped + self.controller.responseId = 2 response["hasMappedComponents"] = True sendRequest.side_effect = one_heartbeat self.controller.DEBUG_STOP_HEARTBEATING = False @@ -399,6 +416,7 @@ class TestController(unittest.TestCase): self.assertTrue(self.controller.hasMappedComponents) # components are mapped + self.controller.responseId = 2 del response["hasMappedComponents"] sendRequest.side_effect = one_heartbeat self.controller.DEBUG_STOP_HEARTBEATING = False @@ -407,8 +425,8 @@ class TestController(unittest.TestCase): self.assertTrue(self.controller.hasMappedComponents) # wrong responseId => restart + self.controller.responseId = 2 response = {"responseId":"2", "restartAgent":"false"} - loadsMock.return_value = response restartAgent = MagicMock(name="restartAgent") self.controller.restartAgent = restartAgent @@ -491,12 +509,12 @@ class TestController(unittest.TestCase): self.controller.componentsUrl = "foo_url/" sendRequest = Mock() self.controller.sendRequest = sendRequest - self.controller.sendRequest.return_value = ('{"clusterName":"dummy_cluster_name",' - '"stackName":"dummy_stack_name",' - '"stackVersion":"dummy_stack_version",' - '"components":{"PIG":{"PIG":"CLIENT"},' - '"MAPREDUCE":{"MAPREDUCE_CLIENT":"CLIENT",' - '"JOBTRACKER":"MASTER","TASKTRACKER":"SLAVE"}}}') + self.controller.sendRequest.return_value = {"clusterName":"dummy_cluster_name", + "stackName":"dummy_stack_name", + "stackVersion":"dummy_stack_version", + "components":{"PIG":{"PIG":"CLIENT"}, + "MAPREDUCE":{"MAPREDUCE_CLIENT":"CLIENT", + "JOBTRACKER":"MASTER","TASKTRACKER":"SLAVE"}}} self.controller.updateComponents("dummy_cluster_name") sendRequest.assert_called_with('foo_url/dummy_cluster_name', None) services_expected = [u'MAPREDUCE', u'PIG'] http://git-wip-us.apache.org/repos/asf/ambari/blob/bc5d82ca/ambari-agent/src/test/python/ambari_agent/TestSecurity.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestSecurity.py b/ambari-agent/src/test/python/ambari_agent/TestSecurity.py index db4b25e..d8955cf 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestSecurity.py +++ b/ambari-agent/src/test/python/ambari_agent/TestSecurity.py @@ -325,8 +325,15 @@ class TestSecurity(unittest.TestCase): # expected pass - - + # Test malformed JSON response + open_mock.return_value.write.reset_mock() + loads_mock.side_effect = Exception() + try: + man.reqSignCrt() + self.fail("Expected exception here") + except ssl.SSLError: + pass + self.assertFalse(open_mock.return_value.write.called) @patch("subprocess.Popen") @patch("subprocess.Popen.communicate") @@ -339,6 +346,34 @@ class TestSecurity(unittest.TestCase): self.assertTrue(popen_mock.called) self.assertTrue(communicate_mock.called) + @patch("ambari_agent.hostname.hostname") + @patch('__builtin__.open', create=True, autospec=True) + @patch('urllib2.urlopen') + @patch.dict('os.environ', {'DUMMY_PASSPHRASE': 'dummy-passphrase'}) + def test_reqSignCrt_malformedJson(self, urlopen_mock, open_mock, hostname_mock): + hostname_mock.return_value = "dummy-hostname" + open_mock.return_value.read.return_value = "dummy_request" + self.config.set('security', 'keysdir', '/dummy-keysdir') + self.config.set('security', 'passphrase_env_var_name', 'DUMMY_PASSPHRASE') + man = CertificateManager(self.config) + + # test valid JSON response + urlopen_mock.return_value.read.return_value = '{"result": "OK", "signedCa":"dummy"}' + try: + man.reqSignCrt() + except ssl.SSLError: + self.fail("Unexpected exception!") + open_mock.return_value.write.assert_called_with(u'dummy') + + # test malformed JSON response + open_mock.return_value.write.reset_mock() + urlopen_mock.return_value.read.return_value = '{malformed_object}' + try: + man.reqSignCrt() + self.fail("Expected exception!") + except ssl.SSLError: + pass + self.assertFalse(open_mock.return_value.write.called) @patch.object(security.CertificateManager, "checkCertExists") def test_initSecurity(self, checkCertExists_method):
