Repository: incubator-airflow
Updated Branches:
  refs/heads/master 1ef2b6c2d -> 1475e6707


[AIRFLOW-1732] Improve dataflow hook logging

Closes #2702 from TrevorEdwards/1732


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1475e670
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1475e670
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1475e670

Branch: refs/heads/master
Commit: 1475e67078a518bfff59dbf05a1b9c0d98e0218c
Parents: 1ef2b6c
Author: Trevor Edwards <[email protected]>
Authored: Tue Oct 24 10:14:21 2017 -0700
Committer: Chris Riccomini <[email protected]>
Committed: Tue Oct 24 10:14:37 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/gcp_dataflow_hook.py    |  6 ++++-
 tests/contrib/hooks/test_gcp_dataflow_hook.py | 29 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1475e670/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py 
b/airflow/contrib/hooks/gcp_dataflow_hook.py
index b1a1e0e..b1e1474 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -90,12 +90,16 @@ class _DataflowJob(LoggingMixin):
 
 class _Dataflow(LoggingMixin):
     def __init__(self, cmd):
+        self.log.info("Running command: %s", ' '.join(cmd))
         self._proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE,
                                       stderr=subprocess.PIPE)
 
     def _line(self, fd):
         if fd == self._proc.stderr.fileno():
-            line = self._proc.stderr.readline()
+            lines = self._proc.stderr.readlines()
+            for line in lines:
+              self.log.warning(line[:-1])
+            line = lines[-1][:-1]
             return line
         if fd == self._proc.stdout.fileno():
             line = self._proc.stdout.readline()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1475e670/tests/contrib/hooks/test_gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_gcp_dataflow_hook.py 
b/tests/contrib/hooks/test_gcp_dataflow_hook.py
index 797d40c..1ab5a99 100644
--- a/tests/contrib/hooks/test_gcp_dataflow_hook.py
+++ b/tests/contrib/hooks/test_gcp_dataflow_hook.py
@@ -14,7 +14,11 @@
 #
 
 import unittest
+from mock import call
+from mock import MagicMock
+
 from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook
+from airflow.contrib.hooks.gcp_dataflow_hook import _Dataflow
 
 try:
     from unittest import mock
@@ -54,3 +58,28 @@ class DataFlowHookTest(unittest.TestCase):
             dataflow=PY_FILE, py_options=PY_OPTIONS)
         internal_dataflow_mock.assert_called_once_with(
             TASK_ID, OPTIONS, PY_FILE, mock.ANY, ['python'] + PY_OPTIONS)
+
+    @mock.patch('airflow.contrib.hooks.gcp_dataflow_hook._Dataflow.log')
+    @mock.patch('subprocess.Popen')
+    @mock.patch('select.select')
+    def test_dataflow_wait_for_done_logging(self, mock_select, mock_popen, 
mock_logging):
+      mock_logging.info = MagicMock()
+      mock_logging.warning = MagicMock()
+      mock_proc = MagicMock()
+      mock_proc.stderr = MagicMock()
+      mock_proc.stderr.readlines = MagicMock(return_value=['test\n','error\n'])
+      mock_stderr_fd = MagicMock()
+      mock_proc.stderr.fileno = MagicMock(return_value=mock_stderr_fd)
+      mock_proc_poll = MagicMock()
+      mock_select.return_value = [[mock_stderr_fd]]
+      def poll_resp_error():
+        mock_proc.return_code = 1
+        return True
+      mock_proc_poll.side_effect=[None, poll_resp_error]
+      mock_proc.poll = mock_proc_poll
+      mock_popen.return_value = mock_proc
+      dataflow = _Dataflow(['test', 'cmd'])
+      mock_logging.info.assert_called_with('Running command: %s', 'test cmd')
+      self.assertRaises(Exception, dataflow.wait_for_done)
+      mock_logging.warning.assert_has_calls([call('test'), call('error')])
+

Reply via email to