Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-9-stable e2d49f3c8 -> d2372d458


[AIRFLOW-1813] Bug SSH Operator empty buffer

The SSH Operator will throw an empty "SSH operator
error" when running
commands that do not immediately log something to
the terminal. This is
due to a call to stdout.channel.recv when the
channel currently has a
0-size buffer, either because the command has not
yet logged anything,
or never will (e.g. sleep 5)

Make code PEP8 compliant

Closes #2785 from RJKeevil/fix-ssh-operator-no-
terminal-output

(cherry picked from commit d4d8eb932657f4d1eccfaa8bb1d12933535fae94)
Signed-off-by: Bolke de Bruin <bo...@xs4all.nl>


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d2372d45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d2372d45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d2372d45

Branch: refs/heads/v1-9-stable
Commit: d2372d458d574b14cae73853d820d9b007f0c179
Parents: e2d49f3
Author: Rob Keevil <robkee...@gmail.com>
Authored: Mon Nov 13 16:01:15 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Nov 13 17:00:45 2017 +0100

----------------------------------------------------------------------
 airflow/contrib/operators/ssh_operator.py    | 25 +++++++++++++----------
 tests/contrib/operators/test_ssh_operator.py | 18 ++++++++++++++++
 2 files changed, 32 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d2372d45/airflow/contrib/operators/ssh_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/ssh_operator.py 
b/airflow/contrib/operators/ssh_operator.py
index fbbf86c..9f2ca81 100644
--- a/airflow/contrib/operators/ssh_operator.py
+++ b/airflow/contrib/operators/ssh_operator.py
@@ -23,7 +23,6 @@ from airflow.utils.decorators import apply_defaults
 
 
 class SSHOperator(BaseOperator):
-
     """
     SSHOperator to execute commands on given remote host using the ssh_hook.
 
@@ -94,10 +93,15 @@ class SSHOperator(BaseOperator):
             stdin.close()
             channel.shutdown_write()
 
-            agg_stdout=b''
-            agg_stderr=b''
+            agg_stdout = b''
+            agg_stderr = b''
+
+            # capture any initial output in case channel is closed already
+            stdout_buffer_length = len(stdout.channel.in_buffer)
+
+            if stdout_buffer_length > 0:
+                agg_stdout += stdout.channel.recv(stdout_buffer_length)
 
-            agg_stdout+=stdout.channel.recv(len(stdout.channel.in_buffer))
             # read from both stdout and stderr
             while not channel.closed or channel.recv_ready() or 
channel.recv_stderr_ready():
                 readq, _, _ = select([channel], [], [], self.timeout)
@@ -105,17 +109,16 @@ class SSHOperator(BaseOperator):
                     if c.recv_ready():
                         line = stdout.channel.recv(len(c.in_buffer))
                         line = line
-                        agg_stdout+=line
+                        agg_stdout += line
                         self.log.info(line.decode('utf-8').strip('\n'))
                     if c.recv_stderr_ready():
                         line = 
stderr.channel.recv_stderr(len(c.in_stderr_buffer))
                         line = line
-                        agg_stderr+=line
+                        agg_stderr += line
                         self.log.warning(line.decode('utf-8').strip('\n'))
-                if stdout.channel.exit_status_ready() \
-                    and not stderr.channel.recv_stderr_ready() \
-                    and not stdout.channel.recv_ready():
-
+                if stdout.channel.exit_status_ready()\
+                        and not stderr.channel.recv_stderr_ready()\
+                        and not stdout.channel.recv_ready():
                     stdout.channel.shutdown_read()
                     stdout.channel.close()
                     break
@@ -137,7 +140,7 @@ class SSHOperator(BaseOperator):
             else:
                 error_msg = agg_stderr.decode('utf-8')
                 raise AirflowException("error running cmd: {0}, error: {1}"
-                                        .format(self.command, error_msg))
+                                       .format(self.command, error_msg))
 
         except Exception as e:
             raise AirflowException("SSH operator error: {0}".format(str(e)))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d2372d45/tests/contrib/operators/test_ssh_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_ssh_operator.py 
b/tests/contrib/operators/test_ssh_operator.py
index f205b97..019dfe4 100644
--- a/tests/contrib/operators/test_ssh_operator.py
+++ b/tests/contrib/operators/test_ssh_operator.py
@@ -107,5 +107,23 @@ class SSHOperatorTest(unittest.TestCase):
         self.assertIsNotNone(ti.duration)
         self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'), 
b'airflow')
 
+    def test_no_output_command(self):
+        configuration.set("core", "enable_xcom_pickling", "True")
+        task = SSHOperator(
+            task_id="test",
+            ssh_hook=self.hook,
+            command="sleep 1",
+            do_xcom_push=True,
+            dag=self.dag,
+        )
+
+        self.assertIsNotNone(task)
+
+        ti = TaskInstance(
+            task=task, execution_date=datetime.now())
+        ti.run()
+        self.assertIsNotNone(ti.duration)
+        self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'), 
b'')
+
 if __name__ == '__main__':
     unittest.main()

Reply via email to