Repository: incubator-airflow Updated Branches: refs/heads/master fd381a11e -> 4f459b64e
[AIRFLOW-1217] Enable Sqoop logging The output of the subprocess was not redirected to airflow, now the sqoop output gets written to Airflow and captured in the logs. Extended the tests of the sqoop popen operation. Closes #2307 from Fokko/AIRFLOW-1217-enable-sqoop- logging Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4f459b64 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4f459b64 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4f459b64 Branch: refs/heads/master Commit: 4f459b64e098f4fd73af23030e7992e0465bfe21 Parents: fd381a1 Author: Fokko Driesprong <[email protected]> Authored: Sun May 21 20:30:25 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Sun May 21 20:30:25 2017 +0200 ---------------------------------------------------------------------- airflow/contrib/hooks/sqoop_hook.py | 39 +++++++++++------- tests/contrib/hooks/test_sqoop_hook.py | 63 ++++++++++++++++++++++++----- 2 files changed, 78 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4f459b64/airflow/contrib/hooks/sqoop_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/sqoop_hook.py b/airflow/contrib/hooks/sqoop_hook.py index 2dd33ae..7fbb6c5 100644 --- a/airflow/contrib/hooks/sqoop_hook.py +++ b/airflow/contrib/hooks/sqoop_hook.py @@ -68,12 +68,20 @@ class SqoopHook(BaseHook): self.hcatalog_database = hcatalog_database self.hcatalog_table = hcatalog_table self.verbose = verbose - self.num_mappers = str(num_mappers) + self.num_mappers = num_mappers self.properties = properties def get_conn(self): pass + def cmd_mask_password(self, cmd): + try: + password_index = cmd.index('--password') + cmd[password_index + 1] = 'MASKED' + except ValueError: + logging.debug("No password in sqoop cmd") + return cmd + def Popen(self, cmd, **kwargs): """ Remote Popen @@ -82,18 +90,21 @@ class SqoopHook(BaseHook): :param kwargs: extra arguments to Popen (see subprocess.Popen) :return: handle to subprocess """ - process = subprocess.Popen(cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - **kwargs) - output, stderr = process.communicate() - - if process.returncode != 0: - raise AirflowException( - "Cannot execute {} on {}. Error code is: {} Output: {}, " - "Stderr: {}".format(cmd, self.conn.host, process.returncode, - output, stderr) - ) + logging.info("Executing command: {}".format(' '.join(cmd))) + sp = subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + **kwargs) + + for line in iter(sp.stdout): + logging.info(line.strip()) + + sp.wait() + + logging.info("Command exited with return code {0}".format(sp.returncode)) + + if sp.returncode: + raise AirflowException("Sqoop command failed: {}".format(' '.join(cmd))) def _prepare_command(self, export=False): if export: @@ -120,7 +131,7 @@ class SqoopHook(BaseHook): if self.archives: connection_cmd += ["-archives", self.archives] if self.num_mappers: - connection_cmd += ["--num-mappers", self.num_mappers] + connection_cmd += ["--num-mappers", str(self.num_mappers)] if self.hcatalog_database: connection_cmd += ["--hcatalog-database", self.hcatalog_database] if self.hcatalog_table: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4f459b64/tests/contrib/hooks/test_sqoop_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_sqoop_hook.py b/tests/contrib/hooks/test_sqoop_hook.py index 94fcada..7c934b9 100644 --- a/tests/contrib/hooks/test_sqoop_hook.py +++ b/tests/contrib/hooks/test_sqoop_hook.py @@ -21,6 +21,10 @@ from airflow.contrib.hooks.sqoop_hook import SqoopHook from airflow.exceptions import AirflowException from airflow.utils import db +from mock import patch, call + +from io import StringIO + class TestSqoopHook(unittest.TestCase): _config = { @@ -69,20 +73,46 @@ class TestSqoopHook(unittest.TestCase): configuration.load_test_config() db.merge_conn( models.Connection( - conn_id='sqoop_test', conn_type='sqoop', + conn_id='sqoop_test', conn_type='sqoop', schema='schema', host='rmdbs', port=5050, extra=json.dumps(self._config_json) ) ) - def test_popen(self): - hook = SqoopHook(**self._config) - - # Should go well - hook.Popen(['ls']) - - # Should give an exception - with self.assertRaises(OSError): - hook.Popen('exit 1') + @patch('subprocess.Popen') + def test_popen(self, mock_popen): + # Given + mock_popen.return_value.stdout = StringIO(u'stdout') + mock_popen.return_value.stderr = StringIO(u'stderr') + mock_popen.return_value.returncode = 0 + mock_popen.return_value.communicate.return_value = [StringIO(u'stdout\nstdout'), StringIO(u'stderr\nstderr')] + + # When + hook = SqoopHook(conn_id='sqoop_test') + hook.export_table(**self._config_export) + + # Then + self.assertEqual(mock_popen.mock_calls[0], call( + ['sqoop', + 'export', + '-jt', self._config_json['job_tracker'], + '-libjars', self._config_json['libjars'], + '-files', self._config_json['files'], + '-fs', self._config_json['namenode'], + '-archives', self._config_json['archives'], + '--connect', 'rmdbs:5050/schema', + '--input-null-string', self._config_export['input_null_string'], + '--input-null-non-string', self._config_export['input_null_non_string'], + '--staging-table', self._config_export['staging_table'], + '--clear-staging-table', + '--enclosed-by', self._config_export['enclosed_by'], + '--escaped-by', self._config_export['escaped_by'], + '--input-fields-terminated-by', self._config_export['input_fields_terminated_by'], + '--input-lines-terminated-by', self._config_export['input_lines_terminated_by'], + '--input-optionally-enclosed-by', self._config_export['input_optionally_enclosed_by'], + '--batch', + '--relaxed-isolation', + '--export-dir', self._config_export['export_dir'], + '--table', self._config_export['table']], stderr=-2, stdout=-1)) def test_submit(self): hook = SqoopHook(**self._config) @@ -228,6 +258,19 @@ class TestSqoopHook(unittest.TestCase): with self.assertRaises(AirflowException): hook._get_export_format_argument('unknown') + def test_cmd_mask_password(self): + hook = SqoopHook() + self.assertEqual( + hook.cmd_mask_password(['--password', 'supersecret']), + ['--password', 'MASKED'] + ) + + cmd = ['--target', 'targettable'] + self.assertEqual( + hook.cmd_mask_password(cmd), + cmd + ) + if __name__ == '__main__': unittest.main()
