Repository: incubator-airflow Updated Branches: refs/heads/master 1ef2b6c2d -> 1475e6707
[AIRFLOW-1732] Improve dataflow hook logging Closes #2702 from TrevorEdwards/1732 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1475e670 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1475e670 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1475e670 Branch: refs/heads/master Commit: 1475e67078a518bfff59dbf05a1b9c0d98e0218c Parents: 1ef2b6c Author: Trevor Edwards <[email protected]> Authored: Tue Oct 24 10:14:21 2017 -0700 Committer: Chris Riccomini <[email protected]> Committed: Tue Oct 24 10:14:37 2017 -0700 ---------------------------------------------------------------------- airflow/contrib/hooks/gcp_dataflow_hook.py | 6 ++++- tests/contrib/hooks/test_gcp_dataflow_hook.py | 29 ++++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1475e670/airflow/contrib/hooks/gcp_dataflow_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py index b1a1e0e..b1e1474 100644 --- a/airflow/contrib/hooks/gcp_dataflow_hook.py +++ b/airflow/contrib/hooks/gcp_dataflow_hook.py @@ -90,12 +90,16 @@ class _DataflowJob(LoggingMixin): class _Dataflow(LoggingMixin): def __init__(self, cmd): + self.log.info("Running command: %s", ' '.join(cmd)) self._proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) def _line(self, fd): if fd == self._proc.stderr.fileno(): - line = self._proc.stderr.readline() + lines = self._proc.stderr.readlines() + for line in lines: + self.log.warning(line[:-1]) + line = lines[-1][:-1] return line if fd == self._proc.stdout.fileno(): line = self._proc.stdout.readline() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1475e670/tests/contrib/hooks/test_gcp_dataflow_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_gcp_dataflow_hook.py b/tests/contrib/hooks/test_gcp_dataflow_hook.py index 797d40c..1ab5a99 100644 --- a/tests/contrib/hooks/test_gcp_dataflow_hook.py +++ b/tests/contrib/hooks/test_gcp_dataflow_hook.py @@ -14,7 +14,11 @@ # import unittest +from mock import call +from mock import MagicMock + from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook +from airflow.contrib.hooks.gcp_dataflow_hook import _Dataflow try: from unittest import mock @@ -54,3 +58,28 @@ class DataFlowHookTest(unittest.TestCase): dataflow=PY_FILE, py_options=PY_OPTIONS) internal_dataflow_mock.assert_called_once_with( TASK_ID, OPTIONS, PY_FILE, mock.ANY, ['python'] + PY_OPTIONS) + + @mock.patch('airflow.contrib.hooks.gcp_dataflow_hook._Dataflow.log') + @mock.patch('subprocess.Popen') + @mock.patch('select.select') + def test_dataflow_wait_for_done_logging(self, mock_select, mock_popen, mock_logging): + mock_logging.info = MagicMock() + mock_logging.warning = MagicMock() + mock_proc = MagicMock() + mock_proc.stderr = MagicMock() + mock_proc.stderr.readlines = MagicMock(return_value=['test\n','error\n']) + mock_stderr_fd = MagicMock() + mock_proc.stderr.fileno = MagicMock(return_value=mock_stderr_fd) + mock_proc_poll = MagicMock() + mock_select.return_value = [[mock_stderr_fd]] + def poll_resp_error(): + mock_proc.return_code = 1 + return True + mock_proc_poll.side_effect=[None, poll_resp_error] + mock_proc.poll = mock_proc_poll + mock_popen.return_value = mock_proc + dataflow = _Dataflow(['test', 'cmd']) + mock_logging.info.assert_called_with('Running command: %s', 'test cmd') + self.assertRaises(Exception, dataflow.wait_for_done) + mock_logging.warning.assert_has_calls([call('test'), call('error')]) +
