This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e453e68158 Fix task log is not captured (#23684)
e453e68158 is described below
commit e453e68158b65177572a3a1f95d8542c3a38d4e7
Author: Ping Zhang <[email protected]>
AuthorDate: Tue May 17 09:29:52 2022 -0700
Fix task log is not captured (#23684)
when StandardTaskRunner runs tasks with exec
Issue: https://github.com/apache/airflow/issues/23540
---
airflow/models/taskinstance.py | 6 ++++--
tests/cli/commands/test_task_command.py | 2 +-
tests/models/test_taskinstance.py | 6 ++++++
3 files changed, 11 insertions(+), 3 deletions(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 0d30727615..f08b3ba812 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -520,7 +520,8 @@ class TaskInstance(Base, LoggingMixin):
self.task_id = task.task_id
self.map_index = map_index
self.refresh_from_task(task)
- self._log = logging.getLogger("airflow.task")
+ # init_on_load will config the log
+ self.init_on_load()
if run_id is None and execution_date is not None:
from airflow.models.dagrun import DagRun # Avoid circular import
@@ -563,7 +564,6 @@ class TaskInstance(Base, LoggingMixin):
if state:
self.state = state
self.hostname = ''
- self.init_on_load()
# Is this TaskInstance being currently running within `airflow tasks
run --raw`.
# Not persisted to the database so only valid for the current process
self.raw = False
@@ -594,6 +594,8 @@ class TaskInstance(Base, LoggingMixin):
@reconstructor
def init_on_load(self):
"""Initialize the attributes that aren't stored in the DB"""
+ # correctly config the ti log
+ self._log = logging.getLogger("airflow.task")
self.test_mode = False # can be changed when calling 'run'
@property
diff --git a/tests/cli/commands/test_task_command.py
b/tests/cli/commands/test_task_command.py
index a998b69392..1c3f9a7759 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -111,7 +111,7 @@ class TestCliTasks(unittest.TestCase):
args = self.parser.parse_args(["tasks", "test", self.dag_id, task_id,
DEFAULT_DATE.isoformat()])
- with self.assertLogs('airflow.models', level='INFO') as cm:
+ with self.assertLogs('airflow.task', level='INFO') as cm:
task_command.task_test(args)
assert any(
[
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index 57f1b2f4aa..220c04d7cc 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -244,6 +244,12 @@ class TestTaskInstance:
assert op2 in op1.downstream_list
assert op2 in op3.downstream_list
+ def test_init_on_load(self, create_task_instance):
+ ti = create_task_instance()
+ # ensure log is correctly created for ORM ti
+ assert ti.log.name == 'airflow.task'
+ assert not ti.test_mode
+
@patch.object(DAG, 'get_concurrency_reached')
def test_requeue_over_dag_concurrency(self, mock_concurrency_reached,
create_task_instance):
mock_concurrency_reached.return_value = True