rafidka edited a comment on issue #13824:
URL: https://github.com/apache/airflow/issues/13824#issuecomment-1055908220


   I just Airflow 2.2.4 and it is working fine for me. On Airflow 2.2.4:
   
   ```
   [2022-03-01 14:05:19,784: INFO/ForkPoolWorker-16] Executing command in 
Celery: ['airflow', 'tasks', 'run', 'print', 'execute_fn', 
'scheduled__2022-03-01T22:04:03.337259+00:00', '--local', '--subdir', 
'DAGS_FOLDER/print.py']
   [2022-03-01 14:05:19,784: INFO/ForkPoolWorker-16] Celery task ID: 
fd4cedea-bbaa-4d92-a536-e19aa0dfc34d
   [2022-03-01 14:05:19,831: INFO/ForkPoolWorker-16] Filling up the DagBag from 
/root/airflow/dags/print.py
   [2022-03-01 14:05:19,889: WARNING/ForkPoolWorker-16] Running <TaskInstance: 
print.execute_fn scheduled__2022-03-01T22:04:03.337259+00:00 [queued]> on host 
000464fbb146
   [2022-03-01 14:05:21,598: INFO/ForkPoolWorker-16] Task 
airflow.executors.celery_executor.execute_command[fd4cedea-bbaa-4d92-a536-e19aa0dfc34d]
 succeeded in 1.8837955820199568s: None
   ```
   
   On Airflow 2.2.3:
   
   ```
   [2022-03-01 14:07:55,928: INFO/ForkPoolWorker-16] Executing command in 
Celery: ['airflow', 'tasks', 'run', 'print', 'execute_fn', 
'scheduled__2022-03-01T22:06:39.485677+00:00', '--local', '--subdir', 
'DAGS_FOLDER/print.py']
   [2022-03-01 14:07:55,928: INFO/ForkPoolWorker-16] Celery task ID: 
c646cb26-e28a-4b33-8346-0e4ca4060232
   [2022-03-01 14:07:55,978: INFO/ForkPoolWorker-16] Filling up the DagBag from 
/root/airflow/dags/print.py
   [2022-03-01 14:07:56,049: WARNING/ForkPoolWorker-16] Running <TaskInstance: 
print.execute_fn scheduled__2022-03-01T22:06:39.485677+00:00 [queued]> on host 
e2196b74de7f
   [2022-03-01 14:08:01,439: ERROR/ForkPoolWorker-16] Failed to execute task 
Task received SIGTERM signal.
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", 
line 121, in _execute_in_fork
       args.func(args)
     File "/usr/local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", 
line 48, in command
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 
92, in wrapper
       return f(*args, **kwargs)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", 
line 298, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", 
line 105, in _run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", 
line 163, in _run_task_by_local_task_job
       run_job.run()
     File "/usr/local/lib/python3.7/site-packages/airflow/jobs/base_job.py", 
line 245, in run
       self._execute()
     File 
"/usr/local/lib/python3.7/site-packages/airflow/jobs/local_task_job.py", line 
103, in _execute
       self.task_runner.start()
     File 
"/usr/local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py",
 line 41, in start
       self.process = self._start_by_fork()
     File 
"/usr/local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py",
 line 97, in _start_by_fork
       logging.shutdown()
     File "/usr/lib64/python3.7/logging/__init__.py", line 2036, in shutdown
       h.flush()
     File "/usr/local/lib/python3.7/site-packages/watchtower/__init__.py", line 
297, in flush
       q.join()
     File "/usr/lib64/python3.7/queue.py", line 89, in join
       self.all_tasks_done.wait()
     File "/usr/lib64/python3.7/threading.py", line 296, in wait
       waiter.acquire()
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1410, in signal_handler
       raise AirflowException("Task received SIGTERM signal")
   airflow.exceptions.AirflowException: Task received SIGTERM signal
   ```
   
   For reference, this is the DAG I am testing with:
   
   ```python
   from datetime import timedelta
   
   from airflow.decorators import dag, task
   from airflow.utils.dates import days_ago
   
   import os
   from datetime import datetime
   
   NUM_LINES = 10000
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @dag(dag_id=DAG_ID, schedule_interval=timedelta(minutes=1), catchup=False, 
start_date=days_ago(0), tags=['test'])
   def print_dag():
       @task()
       def execute_fn():
           for i in range(0, NUM_LINES):
               print(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])
   
       execute_fn_t = execute_fn()
   
   test_dag_d = print_dag()
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to