easontm commented on issue #20992:
URL: https://github.com/apache/airflow/issues/20992#issuecomment-1056148016


   Here's my datapoint. If there are other process logs that matter, please let 
me know.
   Airflow version: 2.2.3
   Executor: CeleryKubernetes
   Deployment pattern: Other docker-based
   - Using HA scheduling (2)
   - 6 Celery workers
   - RabbitMQ as the message queue (3 replicas, RabbitMQClusterOperator)
   
   The task in question is a TimeDeltaSensor. After this failure the Celery 
worker continued to function and has processed other tasks, and this task 
itself was completed by a different Celery worker. I'm not setting 
`default_impersonation`. This issue is also not consistent for me as well -- I 
cannot repro on demand.
   
   Task definition, the default args applied to it only concern scheduling, 
emails, and retries.
   ```
   my_sensor = TimeDeltaSensor(
       task_id="my_sensor", delta=timedelta(minutes=30), dag=dag
   )
   ```
   
   Celery worker log:
   <details>
   
   ```
   [2022-03-01 16:00:07,634: WARNING/ForkPoolWorker-5] Running <TaskInstance: 
MY_DAG.MY_TASK scheduled__2022-03-01T15:00:00+00:00 [queued]> on host 
airflow-celery-worker-6bb54c97cb-gks65
   [2022-03-01 16:00:10,831: ERROR/ForkPoolWorker-4] Failed to execute task PID 
of job runner does not match.
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.7/dist-packages/airflow/executors/celery_executor.py", 
line 121, in _execute_in_fork
       args.func(args)
     File "/usr/local/lib/python3.7/dist-packages/airflow/cli/cli_parser.py", 
line 48, in command
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.7/dist-packages/airflow/utils/cli.py", line 
92, in wrapper
       return f(*args, **kwargs)
     File 
"/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", 
line 298, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File 
"/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", 
line 105, in _run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File 
"/usr/local/lib/python3.7/dist-packages/airflow/cli/commands/task_command.py", 
line 163, in _run_task_by_local_task_job
       run_job.run()
     File "/usr/local/lib/python3.7/dist-packages/airflow/jobs/base_job.py", 
line 245, in run
       self._execute()
     File 
"/usr/local/lib/python3.7/dist-packages/airflow/jobs/local_task_job.py", line 
134, in _execute
       self.heartbeat()
     File "/usr/local/lib/python3.7/dist-packages/airflow/jobs/base_job.py", 
line 226, in heartbeat
       self.heartbeat_callback(session=session)
     File "/usr/local/lib/python3.7/dist-packages/airflow/utils/session.py", 
line 67, in wrapper
       return func(*args, **kwargs)
     File 
"/usr/local/lib/python3.7/dist-packages/airflow/jobs/local_task_job.py", line 
209, in heartbeat_callback
       raise AirflowException("PID of job runner does not match")
   airflow.exceptions.AirflowException: PID of job runner does not match
   ```
   
   </details>
   
   Slightly abbreviated task logs:
   <details>
   
   ```
   
--------------------------------------------------------------------------------
   [2022-03-01, 16:00:05 UTC] {taskinstance.py:1239} INFO - Starting attempt 1 
of 3
   [2022-03-01, 16:00:05 UTC] {taskinstance.py:1240} INFO - 
   
--------------------------------------------------------------------------------
   [2022-03-01, 16:00:05 UTC] {taskinstance.py:1259} INFO - Executing 
<Task(TimeDeltaSensor): MY_TASK> on 2022-03-01 15:00:00+00:00
   [2022-03-01, 16:00:05 UTC] {standard_task_runner.py:52} INFO - Started 
process 5939 to run task
   [2022-03-01, 16:00:05 UTC] {standard_task_runner.py:76} INFO - Running: 
['airflow', 'tasks', 'run', 'MY_DAG', 'MY_TASK', 
'scheduled__2022-03-01T15:00:00+00:00', '--job-id', '6384572', '--raw', 
'--subdir', 'DAGS_FOLDER/MY_DAG.py', '--cfg-path', '/tmp/tmp84eym25r', 
'--error-file', '/tmp/tmpyb039unj']
   [2022-03-01, 16:00:05 UTC] {standard_task_runner.py:77} INFO - Job 6384572: 
Subtask MY_TASK
   [2022-03-01, 16:00:05 UTC] {logging_mixin.py:109} INFO - Running 
<TaskInstance: MY_DAG.MY_TASK scheduled__2022-03-01T15:00:00+00:00 [running]> 
on host airflow-celery-worker-6bb54c97cb-gks65
   [2022-03-01, 16:00:05 UTC] {taskinstance.py:1426} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_OWNER=MY_OWNER
   AIRFLOW_CTX_DAG_ID=MY_DAG
   AIRFLOW_CTX_TASK_ID=MY_TASK
   AIRFLOW_CTX_EXECUTION_DATE=2022-03-01T15:00:00+00:00
   AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-03-01T15:00:00+00:00
   [2022-03-01, 16:00:05 UTC] {time_delta.py:39} INFO - Checking if the time 
(2022-03-01T16:30:00+00:00) has come
   [2022-03-01, 16:00:10 UTC] {local_task_job.py:207} WARNING - Recorded pid 
5949 does not match the current pid 5939
   [2022-03-01, 16:00:10 UTC] {process_utils.py:124} INFO - Sending 
Signals.SIGTERM to group 5939. PIDs of all processes in the group: [5939]
   [2022-03-01, 16:00:10 UTC] {process_utils.py:75} INFO - Sending the signal 
Signals.SIGTERM to group 5939
   [2022-03-01, 16:00:10 UTC] {taskinstance.py:1408} ERROR - Received SIGTERM. 
Terminating subprocesses.
   
   ...
   usual stacktrace
   ```
   
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to