kaxil commented on pull request #5048:
URL: https://github.com/apache/airflow/pull/5048#issuecomment-660121750
Using `write_stdout=True` is causing issues for retrieving Logs for tasks
using SubDagOperators (when using Subprocess to spawn tasks - no os.fork).
Example:
```
Executing <Task(SubDagOperator): section-2> on 2020-07-14T00:00:00+00:00
Running on host: universal-intensity-7339-worker-55c457dc84-hpr2x
Running: ['airflow', 'run', 'example_subdag_operator_1', 'section-2',
'2020-07-14T00:00:00+00:00', '--job_id', '8', '--pool', 'default_pool',
'--raw', '-sd', 'DAGS_FOLDER/example_subdag_operator.py', '--cfg_path',
'/tmp/tmpur_az7dt']
Job 8: Subtask section-2 [2020-07-16 19:58:56,030] {settings.py:213} INFO -
settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10,
pool_recycle=1800, pid=231
Job 8: Subtask section-2 [2020-07-16 19:58:56,818] {__init__.py:51} INFO -
Using executor CeleryExecutor
Job 8: Subtask section-2 [2020-07-16 19:58:56,818] {dagbag.py:413} INFO -
Filling up the DagBag from /usr/local/airflow/dags/example_subdag_operator.py
Job 8: Subtask section-2 [2020-07-16 19:58:56,904] {cli.py:516} INFO -
Running <TaskInstance: example_subdag_operator_1.section-2
2020-07-14T00:00:00+00:00 [running]> on host
universal-intensity-7339-worker-55c457dc84-hpr2x
Job 8: Subtask section-2 [2020-07-16 19:58:57,171] {logging_mixin.py:95}
INFO - [2020-07-16 19:58:57,171] {base_executor.py:59} INFO - Adding to queue:
['airflow', 'run', 'example_subdag_operator_1.section-2', 'bash-task',
'2020-07-14T00:00:00+00:00', '--local', '--pool', 'default_pool', '-sd',
'DAGS_FOLDER/example_subdag_operator.py', '--cfg_path', '/tmp/tmpbuxpv211']
Job 8: Subtask section-2 [2020-07-16 19:59:01,961] {logging_mixin.py:95}
INFO - [2020-07-16 19:59:01,961] {sequential_executor.py:47} INFO - Executing
command: ['airflow', 'run', 'example_subdag_operator_1.section-2', 'bash-task',
'2020-07-14T00:00:00+00:00', '--local', '--pool', 'default_pool', '-sd',
'DAGS_FOLDER/example_subdag_operator.py', '--cfg_path', '/tmp/tmpbuxpv211']
Job 8: Subtask section-2 [2020-07-16 19:59:02,730] {settings.py:213} INFO -
settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10,
pool_recycle=1800, pid=250
Job 8: Subtask section-2 [2020-07-16 19:59:03,541] {__init__.py:51} INFO -
Using executor CeleryExecutor
Job 8: Subtask section-2 [2020-07-16 19:59:03,541] {dagbag.py:413} INFO -
Filling up the DagBag from /usr/local/airflow/dags/example_subdag_operator.py
Job 8: Subtask section-2 [2020-07-16 19:59:03,631] {cli.py:516} INFO -
Running <TaskInstance: example_subdag_operator_1.section-2.bash-task
2020-07-14T00:00:00+00:00 [queued]> on host
universal-intensity-7339-worker-55c457dc84-hpr2x
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,684", "filename":
"taskinstance.py", "lineno": 620, "levelname": "INFO", "message": "Dependencies
all met for <TaskInstance: example_subdag_operator_1.section-2.bash-task
2020-07-14T00:00:00+00:00 [queued]>", "dag_id":
"example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,696", "filename":
"taskinstance.py", "lineno": 620, "levelname": "INFO", "message": "Dependencies
all met for <TaskInstance: example_subdag_operator_1.section-2.bash-task
2020-07-14T00:00:00+00:00 [queued]>", "dag_id":
"example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,696", "filename":
"taskinstance.py", "lineno": 838, "levelname": "INFO", "message":
"\n--------------------------------------------------------------------------------",
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,697", "filename":
"taskinstance.py", "lineno": 839, "levelname": "INFO", "message": "Starting
attempt 1 of 1", "dag_id": "example_subdag_operator_1.section-2", "task_id":
"bash-task", "execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,697", "filename":
"taskinstance.py", "lineno": 840, "levelname": "INFO", "message":
"\n--------------------------------------------------------------------------------",
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,721", "filename":
"taskinstance.py", "lineno": 859, "levelname": "INFO", "message": "Executing
<Task(BashOperator): bash-task> on 2020-07-14T00:00:00+00:00", "dag_id":
"example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,722", "filename":
"base_task_runner.py", "lineno": 133, "levelname": "INFO", "message": "Running
on host: universal-intensity-7339-worker-55c457dc84-hpr2x", "dag_id":
"example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,722", "filename":
"base_task_runner.py", "lineno": 134, "levelname": "INFO", "message": "Running:
['airflow', 'run', 'example_subdag_operator_1.section-2', 'bash-task',
'2020-07-14T00:00:00+00:00', '--job_id', '10', '--pool', 'default_pool',
'--raw', '-sd', 'DAGS_FOLDER/example_subdag_operator.py', '--cfg_path',
'/tmp/tmprv20h5i5']", "dag_id": "example_subdag_operator_1.section-2",
"task_id": "bash-task", "execution_date": "2020_07_14T00_00_00_000000",
"try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:04,532", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:04,532] {settings.py:213} INFO -
settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10,
pool_recycle=1800, pid=268", "dag_id": "example_subdag_operator_1.section-2",
"task_id": "bash-task", "execution_date": "2020_07_14T00_00_00_000000",
"try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,373", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:05,373] {__init__.py:51} INFO - Using
executor CeleryExecutor", "dag_id": "example_subdag_operator_1.section-2",
"task_id": "bash-task", "execution_date": "2020_07_14T00_00_00_000000",
"try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,374", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:05,374] {dagbag.py:413} INFO - Filling up
the DagBag from /usr/local/airflow/dags/example_subdag_operator.py", "dag_id":
"example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,458", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:05,458] {cli.py:516} INFO - Running
<TaskInstance: example_subdag_operator_1.section-2.bash-task
2020-07-14T00:00:00+00:00 [running]> on host
universal-intensity-7339-worker-55c457dc84-hpr2x", "dag_id":
"example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,492", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:05,492] {bash_operator.py:112} INFO - Tmp
dir root location: ", "dag_id": "example_subdag_operator_1.section-2",
"task_id": "bash-task", "execution_date": "2020_07_14T00_00_00_000000",
"try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,492", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task /tmp", "dag_id": "example_subdag_operator_1.section-2",
"task_id": "bash-task", "execution_date": "2020_07_14T00_00_00_000000",
"try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,493", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:05,492] {bash_operator.py:122} INFO -
Exporting the following env vars:", "dag_id":
"example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,493", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task AIRFLOW_CTX_DAG_ID=example_subdag_operator_1.section-2",
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,493", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task AIRFLOW_CTX_TASK_ID=bash-task", "dag_id":
"example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,493", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task AIRFLOW_CTX_EXECUTION_DATE=2020-07-14T00:00:00+00:00",
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,493", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task AIRFLOW_CTX_DAG_RUN_ID=backfill_2020-07-14T00:00:00+00:00",
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,494", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:05,493] {bash_operator.py:136} INFO -
Temporary script location: /tmp/airflowtmpo66_2jmc/bash-taskh1s3dumt",
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,494", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:05,494] {bash_operator.py:146} INFO -
Running command: echo hi", "dag_id": "example_subdag_operator_1.section-2",
"task_id": "bash-task", "execution_date": "2020_07_14T00_00_00_000000",
"try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,503", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:05,502] {bash_operator.py:155} INFO -
Output:", "dag_id": "example_subdag_operator_1.section-2", "task_id":
"bash-task", "execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,503", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:05,503] {bash_operator.py:159} INFO - hi",
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,504", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:05,504] {bash_operator.py:163} INFO -
Command exited with return code 0", "dag_id":
"example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,541", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task
/usr/local/lib/python3.7/site-packages/psycopg2/__init__.py:144: UserWarning:
The psycopg2 wheel package will be renamed from release 2.8; in order to keep
installing from binary please use \"pip install psycopg2-binary\" instead. For
details see:
<http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.",
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,542", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task \"\"\")", "dag_id": "example_subdag_operator_1.section-2",
"task_id": "bash-task", "execution_date": "2020_07_14T00_00_00_000000",
"try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:08,672", "filename":
"logging_mixin.py", "lineno": 95, "levelname": "INFO", "message": "[2020-07-16
19:59:08,672] {local_task_job.py:105} INFO - Task exited with return code 0",
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2
end_of_log/usr/local/lib/python3.7/site-packages/psycopg2/__init__.py:144:
UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in
order to keep installing from binary please use "pip install psycopg2-binary"
instead. For details see:
<http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
Job 8: Subtask section-2 """)
```
The logs for all the tasks in Subdag seems to shown in the task using
SubDagOperator (in parent DAG).
https://github.com/apache/airflow/blob/master/airflow/task/task_runner/base_task_runner.py#L124-L141
- When using subprocess for running the task, all the logs are printed on
stdout and gathered by log_reader thread.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]