lidalei commented on issue #19635:
URL: https://github.com/apache/airflow/issues/19635#issuecomment-1020360043


   Hi @potiuk , we have seen a similar issue since we upgraded from Airflow 
2.0.2 to Airflow 2.2.3, for example in a BashOperator.
   ```
   [2022-01-24, 16:18:18 UTC] {subprocess.py:93} INFO - Command exited with 
return code 0
   [2022-01-24, 16:18:18 UTC] {taskinstance.py:1267} INFO - Marking task as 
SUCCESS. dag_id=XXX, task_id=XXX, execution_date=20220124T120500, 
start_date=20220124T161701, end_date=20220124T161818
   [2022-01-24, 16:18:18 UTC] {local_task_job.py:211} WARNING - State of this 
instance has been externally set to success. Terminating instance.
   [2022-01-24, 16:18:18 UTC] {process_utils.py:120} INFO - Sending 
Signals.SIGTERM to group 553436. PIDs of all processes in the group: [553436]
   [2022-01-24, 16:18:18 UTC] {process_utils.py:75} INFO - Sending the signal 
Signals.SIGTERM to group 553436
   [2022-01-24, 16:18:18 UTC] {taskinstance.py:1408} ERROR - Received SIGTERM. 
Terminating subprocesses.
   [2022-01-24, 16:18:18 UTC] {subprocess.py:99} INFO - Sending SIGTERM signal 
to process group
   [2022-01-24, 16:18:18 UTC] {process_utils.py:70} INFO - Process 
psutil.Process(pid=553436, status='terminated', exitcode=1, started='16:17:01') 
(553436) terminated with exit code 1
   ```
   Besides, we have seen a related error
   ```
   ProcessLookupError: [Errno 3] No such process
     File "airflow/executors/celery_executor.py", line 121, in _execute_in_fork
       args.func(args)
     File "airflow/cli/cli_parser.py", line 48, in command
       return func(*args, **kwargs)
     File "airflow/utils/cli.py", line 92, in wrapper
       return f(*args, **kwargs)
     File "airflow/cli/commands/task_command.py", line 298, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File "airflow/cli/commands/task_command.py", line 105, in 
_run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File "airflow/cli/commands/task_command.py", line 163, in 
_run_task_by_local_task_job
       run_job.run()
     File "airflow/jobs/base_job.py", line 245, in run
       self._execute()
     File "airflow/jobs/local_task_job.py", line 103, in _execute
       self.task_runner.start()
     File "airflow/task/task_runner/standard_task_runner.py", line 41, in start
       self.process = self._start_by_fork()
     File "airflow/task/task_runner/standard_task_runner.py", line 96, in 
_start_by_fork
       Sentry.flush()
     File "airflow/sentry.py", line 188, in flush
       sentry_sdk.flush()
     File "threading.py", line 306, in wait
       gotit = waiter.acquire(True, timeout)
     File "airflow/models/taskinstance.py", line 1409, in signal_handler
       self.task.on_kill()
     File "airflow/operators/bash.py", line 193, in on_kill
       self.subprocess_hook.send_sigterm()
     File "airflow/hooks/subprocess.py", line 101, in send_sigterm
       os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to