kaxil closed pull request #4051: [AIRFLOW-3064] Show logs/output from operators
in `airflow test` command
URL: https://github.com/apache/incubator-airflow/pull/4051
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index eeef6e5eff..675a88a63c 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -641,6 +641,11 @@ def list_tasks(args, dag=None):
@cli_utils.action_logging
def test(args, dag=None):
+ # We want log outout from operators etc to show up here. Normally
+ # airflow.task would redirect to a file, but here we want it to propagate
+ # up to the normal airflow handler.
+ logging.getLogger('airflow.task').propagate = True
+
dag = dag or get_dag(args)
task = dag.get_task(task_id=args.task_id)
diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py
index aeafdd85fe..f274fe907d 100644
--- a/tests/cli/test_cli.py
+++ b/tests/cli/test_cli.py
@@ -18,6 +18,8 @@
# under the License.
#
+from six import StringIO
+import sys
import unittest
from datetime import datetime, timedelta, time
@@ -28,6 +30,7 @@
import subprocess
from argparse import Namespace
from airflow import settings
+import airflow.bin.cli as cli
from airflow.bin.cli import get_num_ready_workers_running, run, get_dag
from airflow.models import TaskInstance
from airflow.utils import timezone
@@ -183,6 +186,29 @@ def test_local_run(self):
state = ti.current_state()
self.assertEqual(state, State.SUCCESS)
+ def test_test(self):
+ """Test the `airflow test` command"""
+ args = create_mock_args(
+ task_id='print_the_context',
+ dag_id='example_python_operator',
+ subdir=None,
+ execution_date=timezone.parse('2018-01-01')
+ )
+
+ saved_stdout = sys.stdout
+ try:
+ sys.stdout = out = StringIO()
+ cli.test(args)
+
+ output = out.getvalue()
+ # Check that prints, and log messages, are shown
+ self.assertIn('Done. Returned value was: Whatever you return gets
printed in the logs',
+ output)
+
self.assertIn("'example_python_operator__print_the_context__20180101'",
+ output)
+ finally:
+ sys.stdout = saved_stdout
+
def test_next_execution(self):
# A scaffolding function
def reset_dr_db(dag_id):
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services