kacpermuda opened a new pull request, #47449:
URL: https://github.com/apache/airflow/pull/47449
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of an existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
The overtime mechanism for auxiliary processes (such as listeners) after
task success was introduced in #39890. However, when used, the resulting stack
trace is not very user-friendly, making it unclear what actually happened. This
PR does not modify the stack trace itself, as the termination mechanism is
broader than just this specific case. Instead, I’ve added a warning message
that explains why the process was killed when the overtime mechanism is the
cause and provides guidance on how to prevent it.
<details>
<summary>
unfriendly stacktrace
</summary>
```
[2025-03-06T08:45:20.648+0000] {local_task_job_runner.py:346} WARNING -
State of this instance has been externally set to success. Terminating instance.
[2025-03-06T08:45:20.649+0000] {local_task_job_runner.py:245} INFO -
::endgroup::
[2025-03-06T08:45:20.652+0000] {process_utils.py:132} INFO - Sending 15 to
group 503. PIDs of all processes in the group: [836, 503]
[2025-03-06T08:45:20.652+0000] {process_utils.py:87} INFO - Sending the
signal 15 to group 503
[2025-03-06T08:45:20.653+0000] {taskinstance.py:3094} ERROR - Received
SIGTERM. Terminating subprocesses.
[2025-03-06T08:45:20.665+0000] {taskinstance.py:3095} ERROR - Stacktrace:
File "/usr/local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.12/site-packages/airflow/__main__.py", line
62, in main
args.func(args)
File "/usr/local/lib/python3.12/site-packages/airflow/cli/cli_config.py",
line 49, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.12/site-packages/airflow/utils/cli.py", line
116, in wrapper
return f(*args, **kwargs)
File
"/usr/local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py",
line 55, in wrapped_function
return func(*args, **kwargs)
File
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
line 56, in scheduler
run_command_with_daemon_option(
File
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py",
line 86, in run_command_with_daemon_option
callback()
File
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
line 59, in <lambda>
callback=lambda: _run_scheduler_job(args),
File
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
line 47, in _run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
File "/usr/local/lib/python3.12/site-packages/airflow/utils/session.py",
line 97, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line
421, in run_job
return execute_job(job, execute_callable=execute_callable)
File "/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line
450, in execute_job
ret = execute_callable()
File
"/usr/local/lib/python3.12/site-packages/astronomer/airflow/version_check/plugin.py",
line 38, in run_before
fn(*args, **kwargs)
File
"/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
line 990, in _execute
executor.start()
File
"/usr/local/lib/python3.12/site-packages/airflow/executors/local_executor.py",
line 392, in start
self.impl.start()
File
"/usr/local/lib/python3.12/site-packages/airflow/executors/local_executor.py",
line 330, in start
worker.start()
File "/usr/local/lib/python3.12/multiprocessing/process.py", line 121, in
start
self._popen = self._Popen(self)
File "/usr/local/lib/python3.12/multiprocessing/context.py", line 224, in
_Popen
return _default_context.get_context().Process._Popen(process_obj)
File "/usr/local/lib/python3.12/multiprocessing/context.py", line 282, in
_Popen
return Popen(process_obj)
File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 19,
in __init__
self._launch(process_obj)
File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 71,
in _launch
code = process_obj._bootstrap(parent_sentinel=child_r)
File "/usr/local/lib/python3.12/multiprocessing/process.py", line 314, in
_bootstrap
self.run()
File
"/usr/local/lib/python3.12/site-packages/airflow/executors/local_executor.py",
line 80, in run
return super().run()
File "/usr/local/lib/python3.12/multiprocessing/process.py", line 108, in
run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.12/site-packages/airflow/traces/tracer.py",
line 58, in wrapper
return func(*args, **kwargs)
File
"/usr/local/lib/python3.12/site-packages/airflow/executors/local_executor.py",
line 211, in do_work
self.execute_work(key=key, command=command)
File "/usr/local/lib/python3.12/site-packages/airflow/traces/tracer.py",
line 58, in wrapper
return func(*args, **kwargs)
File
"/usr/local/lib/python3.12/site-packages/airflow/executors/local_executor.py",
line 100, in execute_work
state = self._execute_work_in_fork(command)
File "/usr/local/lib/python3.12/site-packages/airflow/traces/tracer.py",
line 58, in wrapper
return func(*args, **kwargs)
File
"/usr/local/lib/python3.12/site-packages/airflow/executors/local_executor.py",
line 142, in _execute_work_in_fork
args.func(args)
File "/usr/local/lib/python3.12/site-packages/airflow/cli/cli_config.py",
line 49, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.12/site-packages/airflow/utils/cli.py", line
116, in wrapper
return f(*args, **kwargs)
File
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py",
line 483, in task_run
task_return_code = _run_task_by_selected_method(args, _dag, ti)
File
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py",
line 254, in _run_task_by_selected_method
return _run_task_by_local_task_job(args, ti)
File
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py",
line 322, in _run_task_by_local_task_job
ret = run_job(job=job_runner.job, execute_callable=job_runner._execute)
File "/usr/local/lib/python3.12/site-packages/airflow/utils/session.py",
line 97, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line
421, in run_job
return execute_job(job, execute_callable=execute_callable)
File "/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line
450, in execute_job
ret = execute_callable()
File
"/usr/local/lib/python3.12/site-packages/airflow/jobs/local_task_job_runner.py",
line 171, in _execute
self.task_runner.start()
File
"/usr/local/lib/python3.12/site-packages/airflow/task/task_runner/standard_task_runner.py",
line 55, in start
self.process = self._start_by_fork()
File
"/usr/local/lib/python3.12/site-packages/airflow/task/task_runner/standard_task_runner.py",
line 117, in _start_by_fork
ret = args.func(args, dag=self.dag)
File "/usr/local/lib/python3.12/site-packages/airflow/cli/cli_config.py",
line 49, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.12/site-packages/airflow/utils/cli.py", line
116, in wrapper
return f(*args, **kwargs)
File
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py",
line 483, in task_run
task_return_code = _run_task_by_selected_method(args, _dag, ti)
File
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py",
line 256, in _run_task_by_selected_method
return _run_raw_task(args, ti)
File
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py",
line 341, in _run_raw_task
return ti._run_raw_task(
File "/usr/local/lib/python3.12/site-packages/airflow/utils/session.py",
line 97, in wrapper
return func(*args, session=session, **kwargs)
File
"/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line
3006, in _run_raw_task
return _run_raw_task(
File
"/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line
368, in _run_raw_task
get_listener_manager().hook.on_task_instance_success(
File "/usr/local/lib/python3.12/site-packages/pluggy/_hooks.py", line 513,
in __call__
return self._hookexec(self.name, self._hookimpls.copy(), kwargs,
firstresult)
File "/usr/local/lib/python3.12/site-packages/pluggy/_manager.py", line
120, in _hookexec
return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
File "/usr/local/lib/python3.12/site-packages/pluggy/_callers.py", line
103, in _multicall
res = hook_impl.function(*args)
File
"/usr/local/lib/python3.12/site-packages/airflow/providers/openlineage/plugins/listener.py",
line 259, in on_task_instance_success
self._on_task_instance_success(task_instance, task.dag,
task_instance.dag_run, task)
File
"/usr/local/lib/python3.12/site-packages/airflow/providers/openlineage/plugins/listener.py",
line 326, in _on_task_instance_success
self._execute(on_success, "on_success", use_fork=True)
File
"/usr/local/lib/python3.12/site-packages/airflow/providers/openlineage/plugins/listener.py",
line 452, in _execute
self._fork_execute(callable, callable_name)
File
"/usr/local/lib/python3.12/site-packages/airflow/providers/openlineage/plugins/listener.py",
line 472, in _fork_execute
process.wait(conf.execution_timeout())
File "/usr/local/lib/python3.12/site-packages/psutil/__init__.py", line
1355, in wait
self._exitcode = self._proc.wait(timeout)
File "/usr/local/lib/python3.12/site-packages/psutil/_pslinux.py", line
1717, in wrapper
return fun(self, *args, **kwargs)
File "/usr/local/lib/python3.12/site-packages/psutil/_pslinux.py", line
1949, in wait
return _psposix.wait_pid(self.pid, timeout, self._name)
File "/usr/local/lib/python3.12/site-packages/psutil/_psposix.py", line
146, in wait_pid
interval = sleep(interval)
File "/usr/local/lib/python3.12/site-packages/psutil/_psposix.py", line
124, in sleep
_sleep(interval)
File
"/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line
3095, in signal_handler
self.log.error("Stacktrace: \n%s", "".join(traceback.format_stack()))
[2025-03-06T08:45:22.020+0000] {process_utils.py:80} INFO - Process
psutil.Process(pid=503, status='terminated', exitcode=0, started='08:43:49')
(503) terminated with exit code 0
[2025-03-06T08:45:22.022+0000] {process_utils.py:80} INFO - Process
psutil.Process(pid=836, status='terminated', started='08:44:54') (836)
terminated with exit code None
```
</details>
<!-- Please keep an empty line above the dashes. -->
---
**^ Add meaningful description above**
Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information.
In case of fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in a
newsfragment file, named `{pr_number}.significant.rst` or
`{issue_number}.significant.rst`, in
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]