[ https://issues.apache.org/jira/browse/AIRFLOW-811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723657#comment-16723657 ]
ASF GitHub Bot commented on AIRFLOW-811: ---------------------------------------- stale[bot] closed pull request #2026: [AIRFLOW-811] [BugFix] bash_operator does not return full output URL: https://github.com/apache/incubator-airflow/pull/2026 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py index ff2ed51b96..ebba0ee07e 100644 --- a/airflow/operators/bash_operator.py +++ b/airflow/operators/bash_operator.py @@ -13,11 +13,15 @@ # limitations under the License. -from builtins import bytes +import logging +import mmap import os +import re import signal +import io from subprocess import Popen, STDOUT, PIPE from tempfile import gettempdir, NamedTemporaryFile +from builtins import bytes from airflow.exceptions import AirflowException from airflow.models import BaseOperator @@ -52,60 +56,90 @@ def __init__( bash_command, xcom_push=False, env=None, + log_outout=True, output_encoding='utf-8', + output_regex_filter=None, *args, **kwargs): super(BashOperator, self).__init__(*args, **kwargs) self.bash_command = bash_command self.env = env - self.xcom_push_flag = xcom_push + self.xcom_push = xcom_push + self.log_outout = log_outout self.output_encoding = output_encoding + self.output_regex_filter = output_regex_filter + self.sp = None def execute(self, context): """ Execute the bash command in a temporary directory which will be cleaned afterwards """ - bash_command = self.bash_command - self.log.info("Tmp dir root location: \n %s", gettempdir()) with TemporaryDirectory(prefix='airflowtmp') as tmp_dir: - with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f: - - f.write(bytes(bash_command, 'utf_8')) - f.flush() - fname = f.name + self.log.info("Tmp dir root location: {0}".format(tmp_dir)) + with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as cmd_file, \ + NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as stdout_file, \ + NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as stderr_file: + + cmd_file.write(bytes(self.bash_command, 'utf_8')) + cmd_file.flush() + fname = cmd_file.name script_location = tmp_dir + "/" + fname - self.log.info( - "Temporary script location: %s", - script_location - ) - self.log.info("Running command: %s", bash_command) - sp = Popen( - ['bash', fname], - stdout=PIPE, stderr=STDOUT, - cwd=tmp_dir, env=self.env, - preexec_fn=os.setsid) - - self.sp = sp - - self.log.info("Output:") - line = '' - for line in iter(sp.stdout.readline, b''): - line = line.decode(self.output_encoding).strip() - self.log.info(line) - sp.wait() - self.log.info( - "Command exited with return code %s", - sp.returncode - ) - - if sp.returncode: - raise AirflowException("Bash command failed") - - if self.xcom_push_flag: - return line + logging.info("Temporary script location :{0}".format(script_location)) + logging.info("Running command: " + self.bash_command) + self.sp = Popen( + ['bash', fname], + stdout=stdout_file, + stderr=stderr_file, + cwd=tmp_dir, + env=self.env, + preexec_fn=os.setsid) + + self.sp.wait() + + exit_msg = "Command exited with return code {0}".format(self.sp.returncode) + if self.sp.returncode: + stderr_output = None + with io.open(stderr_file.name, 'r+', encoding=self.output_encoding) as stderr_file_handle: + if os.path.getsize(stderr_file.name) > 0: + stderr_output = mmap.mmap(stderr_file_handle.fileno(), 0, access=mmap.ACCESS_READ) + raise AirflowException("Bash command failed, {0}, error: {1}" + .format(exit_msg, stderr_output)) + + logging.info(exit_msg) + output = None + if self.output_regex_filter or self.log_outout or self.xcom_push: + with io.open(stdout_file.name, 'r+', encoding=self.output_encoding) as stdout_file_handle: + if os.path.getsize(stdout_file_handle.name) > 0: + output = mmap.mmap(stdout_file_handle.fileno(), 0, access=mmap.ACCESS_READ) + if self.output_regex_filter: + pattern = self.output_regex_filter.encode("utf-8") + try: + re.compile(pattern) + except re.error: + raise AirflowException("command executed successfully, " + "but Invalid regex supplied {0} " + .format(self.output_regex_filter)) + + filtered_output = re.search(pattern, output) + if filtered_output: + return filtered_output.group().decode(self.output_encoding) + else: + logging.warning("failed to match on output based on " + "supplied regex : {0}" + .format(self.output_regex_filter)) + + if self.log_outout: + logging.info("stdout: {0}".format(output)) + + if self.xcom_push: + return output + else: + logging.warning("stdout file: {0} is empty".format(stdout_file.name)) + else: + logging.warning("Not logging stdout for the command: {0}" + .format(self.bash_command)) def on_kill(self): self.log.info('Sending SIGTERM signal to bash process group') os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM) - diff --git a/tests/core.py b/tests/core.py index 0c94137d15..c61d9a1fc2 100644 --- a/tests/core.py +++ b/tests/core.py @@ -60,6 +60,7 @@ from lxml import html from airflow.exceptions import AirflowException from airflow.configuration import AirflowConfigException, run_command +from airflow.models import DAG, TaskInstance from jinja2.sandbox import SecurityError from jinja2 import UndefinedError @@ -419,6 +420,25 @@ def test_bash_operator_multi_byte_output(self): output_encoding='utf-8') t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + def test_bash_operator_multi_line_regexed_output(self): + t = BashOperator( + task_id='test_multi_line_bash_operator', + bash_command=r"echo 'this is first line. \n " + r"this is 2nd line, \n " + r"you can have n number of lines in output." + r"This is last line.'", + xcom_push=True, + output_regex_filter="\s?\w+\s?\d\w+ line,", + dag=self.dag, + output_encoding='utf-8') + + ti = TaskInstance(task=t, execution_date=datetime.now()) + ti.run() + expected_regex_matched_output = ' is 2nd line,' + self.assertEqual(ti.xcom_pull(task_ids='test_multi_line_bash_operator', + key='return_value'), + expected_regex_matched_output) + def test_bash_operator_kill(self): import subprocess import psutil ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Bash_operator dont read multiline output > ---------------------------------------- > > Key: AIRFLOW-811 > URL: https://issues.apache.org/jira/browse/AIRFLOW-811 > Project: Apache Airflow > Issue Type: Bug > Reporter: Jayesh > Assignee: Jayesh > Priority: Minor > > following piece of code is the root cause of it. > {code} > line = '' > for line in iter(sp.stdout.readline, b''): > line = line.decode(self.output_encoding).strip() > logging.info(line) > {code} > I plan to fix it using string buffer instead of just 1 line string variable > here. -- This message was sent by Atlassian JIRA (v7.6.3#76005)