kacpermuda opened a new pull request, #47449:
URL: https://github.com/apache/airflow/pull/47449

   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   The overtime mechanism for auxiliary processes (such as listeners) after 
task success was introduced in #39890. However, when used, the resulting stack 
trace is not very user-friendly, making it unclear what actually happened. This 
PR does not modify the stack trace itself, as the termination mechanism is 
broader than just this specific case. Instead, I’ve added a warning message 
that explains why the process was killed when the overtime mechanism is the 
cause and provides guidance on how to prevent it.
   
   <details>
   <summary>
   unfriendly stacktrace
   </summary>
   
   ```
   [2025-03-06T08:45:20.648+0000] {local_task_job_runner.py:346} WARNING - 
State of this instance has been externally set to success. Terminating instance.
   [2025-03-06T08:45:20.649+0000] {local_task_job_runner.py:245} INFO - 
::endgroup::
   [2025-03-06T08:45:20.652+0000] {process_utils.py:132} INFO - Sending 15 to 
group 503. PIDs of all processes in the group: [836, 503]
   [2025-03-06T08:45:20.652+0000] {process_utils.py:87} INFO - Sending the 
signal 15 to group 503
   [2025-03-06T08:45:20.653+0000] {taskinstance.py:3094} ERROR - Received 
SIGTERM. Terminating subprocesses.
   [2025-03-06T08:45:20.665+0000] {taskinstance.py:3095} ERROR - Stacktrace: 
     File "/usr/local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File "/usr/local/lib/python3.12/site-packages/airflow/__main__.py", line 
62, in main
       args.func(args)
     File "/usr/local/lib/python3.12/site-packages/airflow/cli/cli_config.py", 
line 49, in command
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.12/site-packages/airflow/utils/cli.py", line 
116, in wrapper
       return f(*args, **kwargs)
     File 
"/usr/local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py",
 line 55, in wrapped_function
       return func(*args, **kwargs)
     File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 56, in scheduler
       run_command_with_daemon_option(
     File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py", 
line 86, in run_command_with_daemon_option
       callback()
     File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 59, in <lambda>
       callback=lambda: _run_scheduler_job(args),
     File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 47, in _run_scheduler_job
       run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File "/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 97, in wrapper
       return func(*args, session=session, **kwargs)
     File "/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
421, in run_job
       return execute_job(job, execute_callable=execute_callable)
     File "/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
450, in execute_job
       ret = execute_callable()
     File 
"/usr/local/lib/python3.12/site-packages/astronomer/airflow/version_check/plugin.py",
 line 38, in run_before
       fn(*args, **kwargs)
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", 
line 990, in _execute
       executor.start()
     File 
"/usr/local/lib/python3.12/site-packages/airflow/executors/local_executor.py", 
line 392, in start
       self.impl.start()
     File 
"/usr/local/lib/python3.12/site-packages/airflow/executors/local_executor.py", 
line 330, in start
       worker.start()
     File "/usr/local/lib/python3.12/multiprocessing/process.py", line 121, in 
start
       self._popen = self._Popen(self)
     File "/usr/local/lib/python3.12/multiprocessing/context.py", line 224, in 
_Popen
       return _default_context.get_context().Process._Popen(process_obj)
     File "/usr/local/lib/python3.12/multiprocessing/context.py", line 282, in 
_Popen
       return Popen(process_obj)
     File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 19, 
in __init__
       self._launch(process_obj)
     File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 71, 
in _launch
       code = process_obj._bootstrap(parent_sentinel=child_r)
     File "/usr/local/lib/python3.12/multiprocessing/process.py", line 314, in 
_bootstrap
       self.run()
     File 
"/usr/local/lib/python3.12/site-packages/airflow/executors/local_executor.py", 
line 80, in run
       return super().run()
     File "/usr/local/lib/python3.12/multiprocessing/process.py", line 108, in 
run
       self._target(*self._args, **self._kwargs)
     File "/usr/local/lib/python3.12/site-packages/airflow/traces/tracer.py", 
line 58, in wrapper
       return func(*args, **kwargs)
     File 
"/usr/local/lib/python3.12/site-packages/airflow/executors/local_executor.py", 
line 211, in do_work
       self.execute_work(key=key, command=command)
     File "/usr/local/lib/python3.12/site-packages/airflow/traces/tracer.py", 
line 58, in wrapper
       return func(*args, **kwargs)
     File 
"/usr/local/lib/python3.12/site-packages/airflow/executors/local_executor.py", 
line 100, in execute_work
       state = self._execute_work_in_fork(command)
     File "/usr/local/lib/python3.12/site-packages/airflow/traces/tracer.py", 
line 58, in wrapper
       return func(*args, **kwargs)
     File 
"/usr/local/lib/python3.12/site-packages/airflow/executors/local_executor.py", 
line 142, in _execute_work_in_fork
       args.func(args)
     File "/usr/local/lib/python3.12/site-packages/airflow/cli/cli_config.py", 
line 49, in command
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.12/site-packages/airflow/utils/cli.py", line 
116, in wrapper
       return f(*args, **kwargs)
     File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", 
line 483, in task_run
       task_return_code = _run_task_by_selected_method(args, _dag, ti)
     File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", 
line 254, in _run_task_by_selected_method
       return _run_task_by_local_task_job(args, ti)
     File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", 
line 322, in _run_task_by_local_task_job
       ret = run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File "/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 97, in wrapper
       return func(*args, session=session, **kwargs)
     File "/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
421, in run_job
       return execute_job(job, execute_callable=execute_callable)
     File "/usr/local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
450, in execute_job
       ret = execute_callable()
     File 
"/usr/local/lib/python3.12/site-packages/airflow/jobs/local_task_job_runner.py",
 line 171, in _execute
       self.task_runner.start()
     File 
"/usr/local/lib/python3.12/site-packages/airflow/task/task_runner/standard_task_runner.py",
 line 55, in start
       self.process = self._start_by_fork()
     File 
"/usr/local/lib/python3.12/site-packages/airflow/task/task_runner/standard_task_runner.py",
 line 117, in _start_by_fork
       ret = args.func(args, dag=self.dag)
     File "/usr/local/lib/python3.12/site-packages/airflow/cli/cli_config.py", 
line 49, in command
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.12/site-packages/airflow/utils/cli.py", line 
116, in wrapper
       return f(*args, **kwargs)
     File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", 
line 483, in task_run
       task_return_code = _run_task_by_selected_method(args, _dag, ti)
     File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", 
line 256, in _run_task_by_selected_method
       return _run_raw_task(args, ti)
     File 
"/usr/local/lib/python3.12/site-packages/airflow/cli/commands/task_command.py", 
line 341, in _run_raw_task
       return ti._run_raw_task(
     File "/usr/local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 97, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 
3006, in _run_raw_task
       return _run_raw_task(
     File 
"/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 
368, in _run_raw_task
       get_listener_manager().hook.on_task_instance_success(
     File "/usr/local/lib/python3.12/site-packages/pluggy/_hooks.py", line 513, 
in __call__
       return self._hookexec(self.name, self._hookimpls.copy(), kwargs, 
firstresult)
     File "/usr/local/lib/python3.12/site-packages/pluggy/_manager.py", line 
120, in _hookexec
       return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
     File "/usr/local/lib/python3.12/site-packages/pluggy/_callers.py", line 
103, in _multicall
       res = hook_impl.function(*args)
     File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/openlineage/plugins/listener.py",
 line 259, in on_task_instance_success
       self._on_task_instance_success(task_instance, task.dag, 
task_instance.dag_run, task)
     File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/openlineage/plugins/listener.py",
 line 326, in _on_task_instance_success
       self._execute(on_success, "on_success", use_fork=True)
     File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/openlineage/plugins/listener.py",
 line 452, in _execute
       self._fork_execute(callable, callable_name)
     File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/openlineage/plugins/listener.py",
 line 472, in _fork_execute
       process.wait(conf.execution_timeout())
     File "/usr/local/lib/python3.12/site-packages/psutil/__init__.py", line 
1355, in wait
       self._exitcode = self._proc.wait(timeout)
     File "/usr/local/lib/python3.12/site-packages/psutil/_pslinux.py", line 
1717, in wrapper
       return fun(self, *args, **kwargs)
     File "/usr/local/lib/python3.12/site-packages/psutil/_pslinux.py", line 
1949, in wait
       return _psposix.wait_pid(self.pid, timeout, self._name)
     File "/usr/local/lib/python3.12/site-packages/psutil/_psposix.py", line 
146, in wait_pid
       interval = sleep(interval)
     File "/usr/local/lib/python3.12/site-packages/psutil/_psposix.py", line 
124, in sleep
       _sleep(interval)
     File 
"/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 
3095, in signal_handler
       self.log.error("Stacktrace: \n%s", "".join(traceback.format_stack()))
   [2025-03-06T08:45:22.020+0000] {process_utils.py:80} INFO - Process 
psutil.Process(pid=503, status='terminated', exitcode=0, started='08:43:49') 
(503) terminated with exit code 0
   [2025-03-06T08:45:22.022+0000] {process_utils.py:80} INFO - Process 
psutil.Process(pid=836, status='terminated', started='08:44:54') (836) 
terminated with exit code None
   
   ```
   
   </details>
   
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to