casra-developers commented on code in PR #29935:
URL: https://github.com/apache/airflow/pull/29935#discussion_r1127596661


##########
airflow/jobs/local_task_job.py:
##########
@@ -263,7 +263,7 @@ def heartbeat_callback(self, session=None):
                 recorded_pid = psutil.Process(ti.pid).ppid()
                 same_process = recorded_pid == current_pid
 
-            if recorded_pid is not None and not same_process:
+            if not IS_WINDOWS and recorded_pid is not None and not 
same_process:

Review Comment:
   It does not happen every time, but it seems that the work is passed to a 
different process. I've added the PIDs to the exception message for clarity:
   
   ```
   Traceback (most recent call last):
     File "C:\Program Files\Python 3.10\lib\runpy.py", line 196, in 
_run_module_as_main
       return _run_code(code, main_globals, None,
     File "C:\Program Files\Python 3.10\lib\runpy.py", line 86, in _run_code
       exec(code, run_globals)
     File "C:\Program Files\Python 3.10\Scripts\airflow.exe\__main__.py", line 
7, in <module>
       sys.exit(main())
     File "C:\Program Files\Python 3.10\lib\site-packages\airflow\__main__.py", 
line 39, in main
       args.func(args)
     File "C:\Program Files\Python 
3.10\lib\site-packages\airflow\cli\cli_parser.py", line 52, in command
       return func(*args, **kwargs)
     File "C:\Program Files\Python 
3.10\lib\site-packages\airflow\utils\cli.py", line 108, in wrapper
       return f(*args, **kwargs)
     File "C:\Program Files\Python 
3.10\lib\site-packages\airflow\cli\commands\task_command.py", line 395, in 
task_run
       _run_task_by_selected_method(args, dag, ti)
     File "C:\Program Files\Python 
3.10\lib\site-packages\airflow\cli\commands\task_command.py", line 193, in 
_run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File "C:\Program Files\Python 
3.10\lib\site-packages\airflow\cli\commands\task_command.py", line 252, in 
_run_task_by_local_task_job
       run_job.run()
     File "C:\Program Files\Python 
3.10\lib\site-packages\airflow\jobs\base_job.py", line 258, in run
       self._execute()
     File "C:\Program Files\Python 
3.10\lib\site-packages\airflow\jobs\local_task_job.py", line 184, in _execute
       self.heartbeat()
     File "C:\Program Files\Python 
3.10\lib\site-packages\airflow\jobs\base_job.py", line 239, in heartbeat
       self.heartbeat_callback(session=session)
     File "C:\Program Files\Python 
3.10\lib\site-packages\airflow\utils\session.py", line 72, in wrapper
       return func(*args, **kwargs)
     File "C:\Program Files\Python 
3.10\lib\site-packages\airflow\jobs\local_task_job.py", line 259, in 
heartbeat_callback
       raise AirflowException(f"PID of job runner does not match 
({recorded_pid}(recorded)!={current_pid}(current))")
   airflow.exceptions.AirflowException: PID of job runner does not match 
(5644(recorded)!=5008(current))
   2023-03-07 10:37:49,796 - distributed.worker - WARNING - Compute Failed
   Key:       check_call-56d36840-6c36-4a43-b74c-88a1f05f194d
   Function:  check_call
   args:      (['airflow', 'tasks', 'run', 'crash_airflow_worker_dag', 'step1', 
'manual__2023-03-07T09:37:32.252892+00:00', '--local', '--subdir', 
'DAGS_FOLDER/crash_worker_airflow.py'])
   kwargs:    {}
   Exception: "CalledProcessError(1, ['airflow', 'tasks', 'run', 
'crash_airflow_worker_dag', 'step1', 
'manual__2023-03-07T09:37:32.252892+00:00', '--local', '--subdir', 
'DAGS_FOLDER/crash_worker_airflow.py'])"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to