[
https://issues.apache.org/jira/browse/AIRFLOW-3370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159945#comment-17159945
]
ASF GitHub Bot commented on AIRFLOW-3370:
-----------------------------------------
kaxil commented on pull request #5048:
URL: https://github.com/apache/airflow/pull/5048#issuecomment-660121750
Using `write_stdout=True` is causing issues for retrieving Logs for tasks
using SubDagOperators (when using Subprocess to spawn tasks - no os.fork).
Example:
```
Executing <Task(SubDagOperator): section-2> on 2020-07-14T00:00:00+00:00
Running on host: universal-intensity-7339-worker-55c457dc84-hpr2x
Running: ['airflow', 'run', 'example_subdag_operator_1', 'section-2',
'2020-07-14T00:00:00+00:00', '--job_id', '8', '--pool', 'default_pool',
'--raw', '-sd', 'DAGS_FOLDER/example_subdag_operator.py', '--cfg_path',
'/tmp/tmpur_az7dt']
Job 8: Subtask section-2 [2020-07-16 19:58:56,030] {settings.py:213} INFO -
settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10,
pool_recycle=1800, pid=231
Job 8: Subtask section-2 [2020-07-16 19:58:56,818] {__init__.py:51} INFO -
Using executor CeleryExecutor
Job 8: Subtask section-2 [2020-07-16 19:58:56,818] {dagbag.py:413} INFO -
Filling up the DagBag from /usr/local/airflow/dags/example_subdag_operator.py
Job 8: Subtask section-2 [2020-07-16 19:58:56,904] {cli.py:516} INFO -
Running <TaskInstance: example_subdag_operator_1.section-2
2020-07-14T00:00:00+00:00 [running]> on host
universal-intensity-7339-worker-55c457dc84-hpr2x
Job 8: Subtask section-2 [2020-07-16 19:58:57,171] {logging_mixin.py:95}
INFO - [2020-07-16 19:58:57,171] {base_executor.py:59} INFO - Adding to queue:
['airflow', 'run', 'example_subdag_operator_1.section-2', 'bash-task',
'2020-07-14T00:00:00+00:00', '--local', '--pool', 'default_pool', '-sd',
'DAGS_FOLDER/example_subdag_operator.py', '--cfg_path', '/tmp/tmpbuxpv211']
Job 8: Subtask section-2 [2020-07-16 19:59:01,961] {logging_mixin.py:95}
INFO - [2020-07-16 19:59:01,961] {sequential_executor.py:47} INFO - Executing
command: ['airflow', 'run', 'example_subdag_operator_1.section-2', 'bash-task',
'2020-07-14T00:00:00+00:00', '--local', '--pool', 'default_pool', '-sd',
'DAGS_FOLDER/example_subdag_operator.py', '--cfg_path', '/tmp/tmpbuxpv211']
Job 8: Subtask section-2 [2020-07-16 19:59:02,730] {settings.py:213} INFO -
settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10,
pool_recycle=1800, pid=250
Job 8: Subtask section-2 [2020-07-16 19:59:03,541] {__init__.py:51} INFO -
Using executor CeleryExecutor
Job 8: Subtask section-2 [2020-07-16 19:59:03,541] {dagbag.py:413} INFO -
Filling up the DagBag from /usr/local/airflow/dags/example_subdag_operator.py
Job 8: Subtask section-2 [2020-07-16 19:59:03,631] {cli.py:516} INFO -
Running <TaskInstance: example_subdag_operator_1.section-2.bash-task
2020-07-14T00:00:00+00:00 [queued]> on host
universal-intensity-7339-worker-55c457dc84-hpr2x
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,684", "filename":
"taskinstance.py", "lineno": 620, "levelname": "INFO", "message": "Dependencies
all met for <TaskInstance: example_subdag_operator_1.section-2.bash-task
2020-07-14T00:00:00+00:00 [queued]>", "dag_id":
"example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,696", "filename":
"taskinstance.py", "lineno": 620, "levelname": "INFO", "message": "Dependencies
all met for <TaskInstance: example_subdag_operator_1.section-2.bash-task
2020-07-14T00:00:00+00:00 [queued]>", "dag_id":
"example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,696", "filename":
"taskinstance.py", "lineno": 838, "levelname": "INFO", "message":
"\n--------------------------------------------------------------------------------",
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,697", "filename":
"taskinstance.py", "lineno": 839, "levelname": "INFO", "message": "Starting
attempt 1 of 1", "dag_id": "example_subdag_operator_1.section-2", "task_id":
"bash-task", "execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,697", "filename":
"taskinstance.py", "lineno": 840, "levelname": "INFO", "message":
"\n--------------------------------------------------------------------------------",
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,721", "filename":
"taskinstance.py", "lineno": 859, "levelname": "INFO", "message": "Executing
<Task(BashOperator): bash-task> on 2020-07-14T00:00:00+00:00", "dag_id":
"example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,722", "filename":
"base_task_runner.py", "lineno": 133, "levelname": "INFO", "message": "Running
on host: universal-intensity-7339-worker-55c457dc84-hpr2x", "dag_id":
"example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:03,722", "filename":
"base_task_runner.py", "lineno": 134, "levelname": "INFO", "message": "Running:
['airflow', 'run', 'example_subdag_operator_1.section-2', 'bash-task',
'2020-07-14T00:00:00+00:00', '--job_id', '10', '--pool', 'default_pool',
'--raw', '-sd', 'DAGS_FOLDER/example_subdag_operator.py', '--cfg_path',
'/tmp/tmprv20h5i5']", "dag_id": "example_subdag_operator_1.section-2",
"task_id": "bash-task", "execution_date": "2020_07_14T00_00_00_000000",
"try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:04,532", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:04,532] {settings.py:213} INFO -
settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10,
pool_recycle=1800, pid=268", "dag_id": "example_subdag_operator_1.section-2",
"task_id": "bash-task", "execution_date": "2020_07_14T00_00_00_000000",
"try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,373", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:05,373] {__init__.py:51} INFO - Using
executor CeleryExecutor", "dag_id": "example_subdag_operator_1.section-2",
"task_id": "bash-task", "execution_date": "2020_07_14T00_00_00_000000",
"try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,374", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:05,374] {dagbag.py:413} INFO - Filling up
the DagBag from /usr/local/airflow/dags/example_subdag_operator.py", "dag_id":
"example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,458", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:05,458] {cli.py:516} INFO - Running
<TaskInstance: example_subdag_operator_1.section-2.bash-task
2020-07-14T00:00:00+00:00 [running]> on host
universal-intensity-7339-worker-55c457dc84-hpr2x", "dag_id":
"example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,492", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:05,492] {bash_operator.py:112} INFO - Tmp
dir root location: ", "dag_id": "example_subdag_operator_1.section-2",
"task_id": "bash-task", "execution_date": "2020_07_14T00_00_00_000000",
"try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,492", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task /tmp", "dag_id": "example_subdag_operator_1.section-2",
"task_id": "bash-task", "execution_date": "2020_07_14T00_00_00_000000",
"try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,493", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:05,492] {bash_operator.py:122} INFO -
Exporting the following env vars:", "dag_id":
"example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,493", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task AIRFLOW_CTX_DAG_ID=example_subdag_operator_1.section-2",
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,493", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task AIRFLOW_CTX_TASK_ID=bash-task", "dag_id":
"example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,493", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task AIRFLOW_CTX_EXECUTION_DATE=2020-07-14T00:00:00+00:00",
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,493", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task AIRFLOW_CTX_DAG_RUN_ID=backfill_2020-07-14T00:00:00+00:00",
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,494", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:05,493] {bash_operator.py:136} INFO -
Temporary script location: /tmp/airflowtmpo66_2jmc/bash-taskh1s3dumt",
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,494", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:05,494] {bash_operator.py:146} INFO -
Running command: echo hi", "dag_id": "example_subdag_operator_1.section-2",
"task_id": "bash-task", "execution_date": "2020_07_14T00_00_00_000000",
"try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,503", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:05,502] {bash_operator.py:155} INFO -
Output:", "dag_id": "example_subdag_operator_1.section-2", "task_id":
"bash-task", "execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,503", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:05,503] {bash_operator.py:159} INFO - hi",
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,504", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task [2020-07-16 19:59:05,504] {bash_operator.py:163} INFO -
Command exited with return code 0", "dag_id":
"example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,541", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task
/usr/local/lib/python3.7/site-packages/psycopg2/__init__.py:144: UserWarning:
The psycopg2 wheel package will be renamed from release 2.8; in order to keep
installing from binary please use \"pip install psycopg2-binary\" instead. For
details see:
<http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.",
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:05,542", "filename":
"base_task_runner.py", "lineno": 115, "levelname": "INFO", "message": "Job 10:
Subtask bash-task \"\"\")", "dag_id": "example_subdag_operator_1.section-2",
"task_id": "bash-task", "execution_date": "2020_07_14T00_00_00_000000",
"try_number": "1"}
Job 8: Subtask section-2 {"asctime": "2020-07-16 19:59:08,672", "filename":
"logging_mixin.py", "lineno": 95, "levelname": "INFO", "message": "[2020-07-16
19:59:08,672] {local_task_job.py:105} INFO - Task exited with return code 0",
"dag_id": "example_subdag_operator_1.section-2", "task_id": "bash-task",
"execution_date": "2020_07_14T00_00_00_000000", "try_number": "1"}
Job 8: Subtask section-2
end_of_log/usr/local/lib/python3.7/site-packages/psycopg2/__init__.py:144:
UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in
order to keep installing from binary please use "pip install psycopg2-binary"
instead. For details see:
<http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
Job 8: Subtask section-2 """)
```
The logs for all the tasks in Subdag seems to shown in the task using
SubDagOperator (in parent DAG).
https://github.com/apache/airflow/blob/master/airflow/task/task_runner/base_task_runner.py#L124-L141
- When using subprocess for running the task, all the logs are printed on
stdout and gathered by log_reader thread.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Enhance current ES handler with stdout capability and more output options
> -------------------------------------------------------------------------
>
> Key: AIRFLOW-3370
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3370
> Project: Apache Airflow
> Issue Type: Improvement
> Components: celery, logging, worker
> Affects Versions: 1.10.0
> Reporter: Robert Hwang
> Assignee: Andrii Soldatenko
> Priority: Major
> Fix For: 1.10.4
>
>
> Currently, the ES handler in 1.10 can only search from ES, and it's not. Two
> possible enhancements are to allow the handler a "write to standard out"
> functionality, and make log messages more remote-storage friendly, with an
> optional JSON-formatted message, and the regular pretty-print log format as
> the default
--
This message was sent by Atlassian Jira
(v8.3.4#803005)