casra-developers commented on code in PR #29935:
URL: https://github.com/apache/airflow/pull/29935#discussion_r1127596661
##########
airflow/jobs/local_task_job.py:
##########
@@ -263,7 +263,7 @@ def heartbeat_callback(self, session=None):
recorded_pid = psutil.Process(ti.pid).ppid()
same_process = recorded_pid == current_pid
- if recorded_pid is not None and not same_process:
+ if not IS_WINDOWS and recorded_pid is not None and not
same_process:
Review Comment:
It does not happen every time, but it seems that the work is passed to a
different process. I've added the PIDs to the exception message for clarity:
```
Traceback (most recent call last):
File "C:\Program Files\Python 3.10\lib\runpy.py", line 196, in
_run_module_as_main
return _run_code(code, main_globals, None,
File "C:\Program Files\Python 3.10\lib\runpy.py", line 86, in _run_code
exec(code, run_globals)
File "C:\Program Files\Python 3.10\Scripts\airflow.exe\__main__.py", line
7, in <module>
sys.exit(main())
File "C:\Program Files\Python 3.10\lib\site-packages\airflow\__main__.py",
line 39, in main
args.func(args)
File "C:\Program Files\Python
3.10\lib\site-packages\airflow\cli\cli_parser.py", line 52, in command
return func(*args, **kwargs)
File "C:\Program Files\Python
3.10\lib\site-packages\airflow\utils\cli.py", line 108, in wrapper
return f(*args, **kwargs)
File "C:\Program Files\Python
3.10\lib\site-packages\airflow\cli\commands\task_command.py", line 395, in
task_run
_run_task_by_selected_method(args, dag, ti)
File "C:\Program Files\Python
3.10\lib\site-packages\airflow\cli\commands\task_command.py", line 193, in
_run_task_by_selected_method
_run_task_by_local_task_job(args, ti)
File "C:\Program Files\Python
3.10\lib\site-packages\airflow\cli\commands\task_command.py", line 252, in
_run_task_by_local_task_job
run_job.run()
File "C:\Program Files\Python
3.10\lib\site-packages\airflow\jobs\base_job.py", line 258, in run
self._execute()
File "C:\Program Files\Python
3.10\lib\site-packages\airflow\jobs\local_task_job.py", line 184, in _execute
self.heartbeat()
File "C:\Program Files\Python
3.10\lib\site-packages\airflow\jobs\base_job.py", line 239, in heartbeat
self.heartbeat_callback(session=session)
File "C:\Program Files\Python
3.10\lib\site-packages\airflow\utils\session.py", line 72, in wrapper
return func(*args, **kwargs)
File "C:\Program Files\Python
3.10\lib\site-packages\airflow\jobs\local_task_job.py", line 259, in
heartbeat_callback
raise AirflowException(f"PID of job runner does not match
({recorded_pid}(recorded)!={current_pid}(current))")
airflow.exceptions.AirflowException: PID of job runner does not match
(5644(recorded)!=5008(current))
2023-03-07 10:37:49,796 - distributed.worker - WARNING - Compute Failed
Key: check_call-56d36840-6c36-4a43-b74c-88a1f05f194d
Function: check_call
args: (['airflow', 'tasks', 'run', 'crash_airflow_worker_dag', 'step1',
'manual__2023-03-07T09:37:32.252892+00:00', '--local', '--subdir',
'DAGS_FOLDER/crash_worker_airflow.py'])
kwargs: {}
Exception: "CalledProcessError(1, ['airflow', 'tasks', 'run',
'crash_airflow_worker_dag', 'step1',
'manual__2023-03-07T09:37:32.252892+00:00', '--local', '--subdir',
'DAGS_FOLDER/crash_worker_airflow.py'])"
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]