rafidka edited a comment on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1055908220
I just Airflow 2.2.4 and it is working fine for me. On Airflow 2.2.4:
```
[2022-03-01 14:05:19,784: INFO/ForkPoolWorker-16] Executing command in
Celery: ['airflow', 'tasks', 'run', 'print', 'execute_fn',
'scheduled__2022-03-01T22:04:03.337259+00:00', '--local', '--subdir',
'DAGS_FOLDER/print.py']
[2022-03-01 14:05:19,784: INFO/ForkPoolWorker-16] Celery task ID:
fd4cedea-bbaa-4d92-a536-e19aa0dfc34d
[2022-03-01 14:05:19,831: INFO/ForkPoolWorker-16] Filling up the DagBag from
/root/airflow/dags/print.py
[2022-03-01 14:05:19,889: WARNING/ForkPoolWorker-16] Running <TaskInstance:
print.execute_fn scheduled__2022-03-01T22:04:03.337259+00:00 [queued]> on host
000464fbb146
[2022-03-01 14:05:21,598: INFO/ForkPoolWorker-16] Task
airflow.executors.celery_executor.execute_command[fd4cedea-bbaa-4d92-a536-e19aa0dfc34d]
succeeded in 1.8837955820199568s: None
```
On Airflow 2.2.3:
```
[2022-03-01 14:07:55,928: INFO/ForkPoolWorker-16] Executing command in
Celery: ['airflow', 'tasks', 'run', 'print', 'execute_fn',
'scheduled__2022-03-01T22:06:39.485677+00:00', '--local', '--subdir',
'DAGS_FOLDER/print.py']
[2022-03-01 14:07:55,928: INFO/ForkPoolWorker-16] Celery task ID:
c646cb26-e28a-4b33-8346-0e4ca4060232
[2022-03-01 14:07:55,978: INFO/ForkPoolWorker-16] Filling up the DagBag from
/root/airflow/dags/print.py
[2022-03-01 14:07:56,049: WARNING/ForkPoolWorker-16] Running <TaskInstance:
print.execute_fn scheduled__2022-03-01T22:06:39.485677+00:00 [queued]> on host
e2196b74de7f
[2022-03-01 14:08:01,439: ERROR/ForkPoolWorker-16] Failed to execute task
Task received SIGTERM signal.
Traceback (most recent call last):
File
"/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py",
line 121, in _execute_in_fork
args.func(args)
File "/usr/local/lib/python3.7/site-packages/airflow/cli/cli_parser.py",
line 48, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line
92, in wrapper
return f(*args, **kwargs)
File
"/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py",
line 298, in task_run
_run_task_by_selected_method(args, dag, ti)
File
"/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py",
line 105, in _run_task_by_selected_method
_run_task_by_local_task_job(args, ti)
File
"/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py",
line 163, in _run_task_by_local_task_job
run_job.run()
File "/usr/local/lib/python3.7/site-packages/airflow/jobs/base_job.py",
line 245, in run
self._execute()
File
"/usr/local/lib/python3.7/site-packages/airflow/jobs/local_task_job.py", line
103, in _execute
self.task_runner.start()
File
"/usr/local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py",
line 41, in start
self.process = self._start_by_fork()
File
"/usr/local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py",
line 97, in _start_by_fork
logging.shutdown()
File "/usr/lib64/python3.7/logging/__init__.py", line 2036, in shutdown
h.flush()
File "/usr/local/lib/python3.7/site-packages/watchtower/__init__.py", line
297, in flush
q.join()
File "/usr/lib64/python3.7/queue.py", line 89, in join
self.all_tasks_done.wait()
File "/usr/lib64/python3.7/threading.py", line 296, in wait
waiter.acquire()
File
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line
1410, in signal_handler
raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
```
For reference, this is the DAG I am testing with:
```python
from datetime import timedelta
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
import os
from datetime import datetime
NUM_LINES = 10000
DAG_ID = os.path.basename(__file__).replace(".py", "")
@dag(dag_id=DAG_ID, schedule_interval=timedelta(minutes=1), catchup=False,
start_date=days_ago(0), tags=['test'])
def print_dag():
@task()
def execute_fn():
for i in range(0, NUM_LINES):
print(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
execute_fn_t = execute_fn()
test_dag_d = print_dag()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]