kaxil commented on pull request #5048:
URL: https://github.com/apache/airflow/pull/5048#issuecomment-660121750


   Using `write_stdout=True` is causing issues for retrieving Logs for tasks 
using SubDagOperators (when using Subprocess to spawn tasks - no os.fork).
   
   Example:
   
   ```
   Executing <Task(SubDagOperator): section-2> on 2020-07-14T00:00:00+00:00
   Running on host: universal-intensity-7339-worker-55c457dc84-hpr2x
   Running: ['airflow', 'run', 'example_subdag_operator_1', 'section-2', 
'2020-07-14T00:00:00+00:00', '--job_id', '8', '--pool', 'default_pool', 
'--raw', '-sd', 'DAGS_FOLDER/example_subdag_operator.py', '--cfg_path', 
'/tmp/tmpur_az7dt']
   Job 8: Subtask section-2 [2020-07-16 19:58:56,030] {settings.py:213} INFO - 
settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, 
pool_recycle=1800, pid=231
   Job 8: Subtask section-2 [2020-07-16 19:58:56,818] {__init__.py:51} INFO - 
Using executor CeleryExecutor
   Job 8: Subtask section-2 [2020-07-16 19:58:56,818] {dagbag.py:413} INFO - 
Filling up the DagBag from /usr/local/airflow/dags/example_subdag_operator.py
   Job 8: Subtask section-2 [2020-07-16 19:58:56,904] {cli.py:516} INFO - 
Running <TaskInstance: example_subdag_operator_1.section-2 
2020-07-14T00:00:00+00:00 [running]> on host 
universal-intensity-7339-worker-55c457dc84-hpr2x
   Job 8: Subtask section-2 [2020-07-16 19:58:57,171] {logging_mixin.py:95} 
INFO - [2020-07-16 19:58:57,171] {base_executor.py:59} INFO - Adding to queue: 
['airflow', 'run', 'example_subdag_operator_1.section-2', 'bash-task', 
'2020-07-14T00:00:00+00:00', '--local', '--pool', 'default_pool', '-sd', 
'DAGS_FOLDER/example_subdag_operator.py', '--cfg_path', '/tmp/tmpbuxpv211']
   Job 8: Subtask section-2 [2020-07-16 19:59:01,961] {logging_mixin.py:95} 
INFO - [2020-07-16 19:59:01,961] {sequential_executor.py:47} INFO - Executing 
command: ['airflow', 'run', 'example_subdag_operator_1.section-2', 'bash-task', 
'2020-07-14T00:00:00+00:00', '--local', '--pool', 'default_pool', '-sd', 
'DAGS_FOLDER/example_subdag_operator.py', '--cfg_path', '/tmp/tmpbuxpv211']
   Job 8: Subtask section-2 [2020-07-16 19:59:02,730] {settings.py:213} INFO - 
settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, 
pool_recycle=1800, pid=250
   Job 8: Subtask section-2 [2020-07-16 19:59:03,541] {__init__.py:51} INFO - 
Using executor CeleryExecutor
   Job 8: Subtask section-2 [2020-07-16 19:59:03,541] {dagbag.py:413} INFO - 
Filling up the DagBag from /usr/local/airflow/dags/example_subdag_operator.py
   Job 8: Subtask section-2 [2020-07-16 19:59:03,631] {cli.py:516} INFO - 
Running <TaskInstance: example_subdag_operator_1.section-2.bash-task 
2020-07-14T00:00:00+00:00 [queued]> on host 
universal-intensity-7339-worker-55c457dc84-hpr2x
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,684", "filename": 
"taskinstance.py", "lineno": 620, "levelname": "INFO", "message": "Dependencies 
all met for <TaskInstance: example_subdag_operator_1.section-2.bash-task 
2020-07-14T00:00:00+00:00 [queued]>", "dag_id": 
"example_subdag_operator_1.section-2", "task_id": "bash-task", 
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,696", "filename": 
"taskinstance.py", "lineno": 620, "levelname": "INFO", "message": "Dependencies 
all met for <TaskInstance: example_subdag_operator_1.section-2.bash-task 
2020-07-14T00:00:00+00:00 [queued]>", "dag_id": 
"example_subdag_operator_1.section-2", "task_id": "bash-task", 
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,696", "filename": 
"taskinstance.py", "lineno": 838, "levelname": "INFO", "message": 
"\n--------------------------------------------------------------------------------",
 "dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task", 
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,697", "filename": 
"taskinstance.py", "lineno": 839, "levelname": "INFO", "message": "Starting 
attempt 1 of 1", "dag_id": "example_subdag_operator_1.section-2", "task_id": 
"bash-task", "execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,697", "filename": 
"taskinstance.py", "lineno": 840, "levelname": "INFO", "message": 
"\n--------------------------------------------------------------------------------",
 "dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task", 
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,721", "filename": 
"taskinstance.py", "lineno": 859, "levelname": "INFO", "message": "Executing 
<Task(BashOperator): bash-task> on 2020-07-14T00:00:00+00:00", "dag_id": 
"example_subdag_operator_1.section-2", "task_id": "bash-task", 
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,722", "filename": 
"base_task_runner.py", "lineno": 133, "levelname": "INFO", "message": "Running 
on host: universal-intensity-7339-worker-55c457dc84-hpr2x", "dag_id": 
"example_subdag_operator_1.section-2", "task_id": "bash-task", 
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,722", "filename": 
"base_task_runner.py", "lineno": 134, "levelname": "INFO", "message": "Running: 
['airflow', 'run', 'example_subdag_operator_1.section-2', 'bash-task', 
'2020-07-14T00:00:00+00:00', '--job_id', '10', '--pool', 'default_pool', 
'--raw', '-sd', 'DAGS_FOLDER/example_subdag_operator.py', '--cfg_path', 
'/tmp/tmprv20h5i5']", "dag_id": "example_subdag_operator_1.section-2", 
"task_id": "bash-task", "execution_date": "2020_07_14T00_00_00_000000", 
"try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:04,532", "filename": 
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10: 
Subtask bash-task [2020-07-16 19:59:04,532] {settings.py:213} INFO - 
settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, 
pool_recycle=1800, pid=268", "dag_id": "example_subdag_operator_1.section-2", 
"task_id": "bash-task", "execution_date": "2020_07_14T00_00_00_000000", 
"try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,373", "filename": 
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10: 
Subtask bash-task [2020-07-16 19:59:05,373] {__init__.py:51} INFO - Using 
executor CeleryExecutor", "dag_id": "example_subdag_operator_1.section-2", 
"task_id": "bash-task", "execution_date": "2020_07_14T00_00_00_000000", 
"try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,374", "filename": 
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10: 
Subtask bash-task [2020-07-16 19:59:05,374] {dagbag.py:413} INFO - Filling up 
the DagBag from /usr/local/airflow/dags/example_subdag_operator.py", "dag_id": 
"example_subdag_operator_1.section-2", "task_id": "bash-task", 
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,458", "filename": 
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10: 
Subtask bash-task [2020-07-16 19:59:05,458] {cli.py:516} INFO - Running 
<TaskInstance: example_subdag_operator_1.section-2.bash-task 
2020-07-14T00:00:00+00:00 [running]> on host 
universal-intensity-7339-worker-55c457dc84-hpr2x", "dag_id": 
"example_subdag_operator_1.section-2", "task_id": "bash-task", 
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,492", "filename": 
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10: 
Subtask bash-task [2020-07-16 19:59:05,492] {bash_operator.py:112} INFO - Tmp 
dir root location: ", "dag_id": "example_subdag_operator_1.section-2", 
"task_id": "bash-task", "execution_date": "2020_07_14T00_00_00_000000", 
"try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,492", "filename": 
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10: 
Subtask bash-task  /tmp", "dag_id": "example_subdag_operator_1.section-2", 
"task_id": "bash-task", "execution_date": "2020_07_14T00_00_00_000000", 
"try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,493", "filename": 
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10: 
Subtask bash-task [2020-07-16 19:59:05,492] {bash_operator.py:122} INFO - 
Exporting the following env vars:", "dag_id": 
"example_subdag_operator_1.section-2", "task_id": "bash-task", 
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,493", "filename": 
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10: 
Subtask bash-task AIRFLOW_CTX_DAG_ID=example_subdag_operator_1.section-2", 
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task", 
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,493", "filename": 
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10: 
Subtask bash-task AIRFLOW_CTX_TASK_ID=bash-task", "dag_id": 
"example_subdag_operator_1.section-2", "task_id": "bash-task", 
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,493", "filename": 
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10: 
Subtask bash-task AIRFLOW_CTX_EXECUTION_DATE=2020-07-14T00:00:00+00:00", 
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task", 
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,493", "filename": 
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10: 
Subtask bash-task AIRFLOW_CTX_DAG_RUN_ID=backfill_2020-07-14T00:00:00+00:00", 
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task", 
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,494", "filename": 
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10: 
Subtask bash-task [2020-07-16 19:59:05,493] {bash_operator.py:136} INFO - 
Temporary script location: /tmp/airflowtmpo66_2jmc/bash-taskh1s3dumt", 
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task", 
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,494", "filename": 
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10: 
Subtask bash-task [2020-07-16 19:59:05,494] {bash_operator.py:146} INFO - 
Running command: echo hi", "dag_id": "example_subdag_operator_1.section-2", 
"task_id": "bash-task", "execution_date": "2020_07_14T00_00_00_000000", 
"try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,503", "filename": 
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10: 
Subtask bash-task [2020-07-16 19:59:05,502] {bash_operator.py:155} INFO - 
Output:", "dag_id": "example_subdag_operator_1.section-2", "task_id": 
"bash-task", "execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,503", "filename": 
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10: 
Subtask bash-task [2020-07-16 19:59:05,503] {bash_operator.py:159} INFO - hi", 
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task", 
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,504", "filename": 
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10: 
Subtask bash-task [2020-07-16 19:59:05,504] {bash_operator.py:163} INFO - 
Command exited with return code 0", "dag_id": 
"example_subdag_operator_1.section-2", "task_id": "bash-task", 
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,541", "filename": 
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10: 
Subtask bash-task 
/usr/local/lib/python3.7/site-packages/psycopg2/__init__.py:144: UserWarning: 
The psycopg2 wheel package will be renamed from release 2.8; in order to keep 
installing from binary please use \"pip install psycopg2-binary\" instead. For 
details see: 
<http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.", 
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task", 
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,542", "filename": 
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10: 
Subtask bash-task   \"\"\")", "dag_id": "example_subdag_operator_1.section-2", 
"task_id": "bash-task", "execution_date": "2020_07_14T00_00_00_000000", 
"try_number": "1"}
   Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:08,672", "filename": 
"logging_mixin.py", "lineno": 95, "levelname": "INFO", "message": "[2020-07-16 
19:59:08,672] {local_task_job.py:105} INFO - Task exited with return code 0", 
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task", 
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
   Job 8: Subtask section-2 
end_of_log/usr/local/lib/python3.7/site-packages/psycopg2/__init__.py:144: 
UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in 
order to keep installing from binary please use "pip install psycopg2-binary" 
instead. For details see: 
<http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
   Job 8: Subtask section-2   """)
   ```
   
   The logs for all the tasks in Subdag seems to shown in the task using 
SubDagOperator (in parent DAG). 
   
   
https://github.com/apache/airflow/blob/master/airflow/task/task_runner/base_task_runner.py#L124-L141
 - When using subprocess for running the task, all the logs are printed on 
stdout and gathered by log_reader thread.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to