vizeit commented on issue #39717:
URL: https://github.com/apache/airflow/issues/39717#issuecomment-2211493420

   > I did few tests with new version 2.9.2 and have the following details with 
the log
   > 
   > **Configuration**
   > 
   > > Airflow version: 2.9.2
   > > Compute: GKE
   > > Executor: CeleryKubernetesExecutor
   > > AIRFLOW__CORE__PARALLELISM: 160
   > > AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY: 0
   > > AIRFLOW__CELERY__WORKER_CONCURRENCY: 60
   > > Worker replicas: 4
   > > Scheduler replicas: 2
   > > AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT: 3600
   > 
   > I am running multiple instances of a dag with dynamic task mapping that 
expands into hundreds of tasks. The log shows that task gets scheduled and 
queued (at 2024-07-05T00:01:59.683) but does not get executed within task 
queued timeout period resulting into the reported error (at 
2024-07-05T01:02:09.431)
   > 
   > ```
   > {
   >   "textPayload": "\t<TaskInstance: dynamic-map-group.supplier.agent 
manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [scheduled]>\u001b[0m",
   >   "insertId": "5fqi9x47wvvla4jh",
   >   "resource": {
   >     "type": "k8s_container",
   >     "labels": {
   >       "container_name": "airflow-scheduler",
   >       "namespace_name": "mynamespacedevdev",
   >       "location": "us-central1",
   >       "project_id": "mygcp-project",
   >       "cluster_name": "mygkecluster",
   >       "pod_name": "mygkecluster-scheduler-6b77fc67d-8z6gs"
   >     }
   >   },
   >   "timestamp": "2024-07-05T00:01:50.234024733Z",
   >   "severity": "INFO",
   >   "labels": {
   >     "k8s-pod/release": "mygkecluster",
   >     "k8s-pod/component": "scheduler",
   >     "k8s-pod/pod-template-hash": "6b77fc67d",
   >     "compute.googleapis.com/resource_name": 
"gk3-mygkecluster-nap-c58g4osx-19a59e8c-pxv6",
   >     "k8s-pod/app": "airflow"
   >   },
   >   "logName": "projects/mygcp-project/logs/stdout",
   >   "receiveTimestamp": "2024-07-05T00:01:52.886420247Z"
   > }
   > 
   > {
   >   "textPayload": "[2024-07-05 00:01:59,683: INFO/ForkPoolWorker-49] 
Running <TaskInstance: dynamic-map-group.supplier.agent 
manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [queued]> on host 
mygkecluster-worker-1.mygkecluster-worker.mynamespacedevdev.svc.cluster.local",
   >   "insertId": "4o989t3huassb59y",
   >   "resource": {
   >     "type": "k8s_container",
   >     "labels": {
   >       "container_name": "airflow-worker",
   >       "project_id": "mygcp-project",
   >       "location": "us-central1",
   >       "pod_name": "mygkecluster-worker-1",
   >       "namespace_name": "mynamespacedevdev",
   >       "cluster_name": "mygkecluster"
   >     }
   >   },
   >   "timestamp": "2024-07-05T00:01:59.683635856Z",
   >   "severity": "INFO",
   >   "labels": {
   >     "k8s-pod/apps_kubernetes_io/pod-index": "1",
   >     "k8s-pod/release": "mygkecluster",
   >     "k8s-pod/component": "worker",
   >     "k8s-pod/app": "airflow",
   >     "k8s-pod/statefulset_kubernetes_io/pod-name": "mygkecluster-worker-1",
   >     "k8s-pod/controller-revision-hash": "mygkecluster-worker-87b575989",
   >     "compute.googleapis.com/resource_name": 
"gk3-mygkecluster-nap-c58g4osx-19a59e8c-kdqm"
   >   },
   >   "logName": "projects/mygcp-project/logs/stderr",
   >   "receiveTimestamp": "2024-07-05T00:02:03.578550371Z"
   > }
   > 
   > {
   >   "textPayload": "[\u001b[34m2024-07-05T01:01:58.407+0000\u001b[0m] 
{\u001b[34mtask_context_logger.py:\u001b[0m91} WARNING\u001b[0m - Marking task 
instance <TaskInstance: dynamic-map-group.supplier.agent 
manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [queued]> stuck in 
queued as failed. If the task instance has available retries, it will be 
retried.\u001b[0m",
   >   "insertId": "11k0z0jmz77mlcu6",
   >   "resource": {
   >     "type": "k8s_container",
   >     "labels": {
   >       "container_name": "airflow-scheduler",
   >       "project_id": "mygcp-project",
   >       "namespace_name": "mynamespacedevdev",
   >       "cluster_name": "mygkecluster",
   >       "location": "us-central1",
   >       "pod_name": "mygkecluster-scheduler-6b77fc67d-8z6gs"
   >     }
   >   },
   >   "timestamp": "2024-07-05T01:01:58.409116538Z",
   >   "severity": "INFO",
   >   "labels": {
   >     "k8s-pod/release": "mygkecluster",
   >     "k8s-pod/app": "airflow",
   >     "k8s-pod/component": "scheduler",
   >     "compute.googleapis.com/resource_name": 
"gk3-mygkecluster-nap-c58g4osx-19a59e8c-pxv6",
   >     "k8s-pod/pod-template-hash": "6b77fc67d"
   >   },
   >   "logName": "projects/mygcp-project/logs/stdout",
   >   "receiveTimestamp": "2024-07-05T01:02:02.907406580Z"
   > }
   > 
   > {
   >   "textPayload": "[\u001b[34m2024-07-05T01:02:09.431+0000\u001b[0m] 
{\u001b[34mtask_context_logger.py:\u001b[0m91} ERROR\u001b[0m - Executor 
reports task instance <TaskInstance: dynamic-map-group.supplier.agent 
manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [queued]> finished 
(failed) although the task says it's queued. (Info: None) Was the task killed 
externally?\u001b[0m",
   >   "insertId": "pldcpv17g4ggyycu",
   >   "resource": {
   >     "type": "k8s_container",
   >     "labels": {
   >       "project_id": "mygcp-project",
   >       "location": "us-central1",
   >       "pod_name": "mygkecluster-scheduler-6b77fc67d-8z6gs",
   >       "cluster_name": "mygkecluster",
   >       "container_name": "airflow-scheduler",
   >       "namespace_name": "mynamespacedevdev"
   >     }
   >   },
   >   "timestamp": "2024-07-05T01:02:09.431825344Z",
   >   "severity": "INFO",
   >   "labels": {
   >     "k8s-pod/app": "airflow",
   >     "compute.googleapis.com/resource_name": 
"gk3-mygkecluster-nap-c58g4osx-19a59e8c-pxv6",
   >     "k8s-pod/component": "scheduler",
   >     "k8s-pod/release": "mygkecluster",
   >     "k8s-pod/pod-template-hash": "6b77fc67d"
   >   },
   >   "logName": "projects/mygcp-project/logs/stdout",
   >   "receiveTimestamp": "2024-07-05T01:02:12.928364383Z"
   > }
   > ```
   > 
   > I believe increasing **AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT** value 
might fix the issue. However, the root cause is still unknown, why did worker 
not process the task for an entire hour? Interesting observation is the failed 
task with this error is from the first instance of the DAG e.g. If I have 8 
instances of the dag running, the error shows up in the 1st instance. Which 
indicates that tasks from all the instances are running and processed but 
somehow-sometime certain (not all) task from the 1st instance never gets 
executed. It may have to do with the overall throughput so tasks never stay in 
the queue for that long and I am not sure yet how to increase it. I have enough 
CPU and memory for the worker replicas as well as scheduler. Any ideas?
   
   I take back my previous comment, the issue occurs without reaching task 
queued timeout. I did few more tests and also turned on debug log level. Please 
find the log from worker 
   ```
   [2024-07-05 21:34:55,178: INFO/MainProcess] Task 
airflow.providers.celery.executors.celery_executor_utils.execute_command[c7d43e18-44af-4524-b83d-237e6b4d6a39]
 received
   [2024-07-05 21:34:55,178: DEBUG/MainProcess] TaskPool: Apply <function 
fast_trace_task at 0x7c4547b2bb50> 
(args:('airflow.providers.celery.executors.celery_executor_utils.execute_command',
 'c7d43e18-44af-4524-b83d-237e6b4d6a39', {'lang': 'py', 'task': 
'airflow.providers.celery.executors.celery_executor_utils.execute_command', 
'id': 'c7d43e18-44af-4524-b83d-237e6b4d6a39', 'shadow': None, 'eta': None, 
'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': 
[None, None], 'root_id': 'c7d43e18-44af-4524-b83d-237e6b4d6a39', 'parent_id': 
None, 'argsrepr': "[['airflow', 'tasks', 'run', 'dynamic-map', 'consumer', 
'manual__2024-07-05T21:23:23.580900+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dynamic_task_mapping.py', '--map-index', '243']]", 'kwargsrepr': 
'{}', 'origin': 'gen9371@mygkecluster-scheduler-545bb869c4-td9p7', 
'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 
'stamps': {}, 'properties': {'correlation_id': 'c7d43e18-44af-4524-b83
 d-237e6b4d6a39', 'reply_to': '941f6db4-8ed8-3a87-b084-d800ec5eec6f', 
'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key':... 
kwargs:{})
   [2024-07-05 21:34:55,180: DEBUG/MainProcess] Task accepted: 
airflow.providers.celery.executors.celery_executor_utils.execute_command[c7d43e18-44af-4524-b83d-237e6b4d6a39]
 pid:98
   [2024-07-05 21:34:55,201: INFO/ForkPoolWorker-31] 
[c7d43e18-44af-4524-b83d-237e6b4d6a39] Executing command in Celery: ['airflow', 
'tasks', 'run', 'dynamic-map', 'consumer', 
'manual__2024-07-05T21:23:23.580900+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dynamic_task_mapping.py', '--map-index', '243']
   [2024-07-05 21:34:55,210: DEBUG/ForkPoolWorker-31] calling func 'task_run' 
with args Namespace(subcommand='run', dag_id='dynamic-map', task_id='consumer', 
execution_date_or_run_id='manual__2024-07-05T21:23:23.580900+00:00', 
cfg_path=None, depends_on_past='check', force=False, 
ignore_all_dependencies=False, ignore_dependencies=False, 
ignore_depends_on_past=False, interactive=False, job_id=None, local=True, 
map_index=243, mark_success=False, shut_down_logging=False, pickle=None, 
pool=None, raw=False, read_from_db=False, ship_dag=False, 
subdir='DAGS_FOLDER/dynamic_task_mapping.py', verbose=False, func=<function 
lazy_load_command.<locals>.command at 0x7c454c769480>, 
external_executor_id='c7d43e18-44af-4524-b83d-237e6b4d6a39')
   [2024-07-05 21:35:48,192: ERROR/ForkPoolWorker-31] 
[c7d43e18-44af-4524-b83d-237e6b4d6a39] Failed to execute task.
   [2024-07-05 21:35:48,547: ERROR/ForkPoolWorker-31] Task 
airflow.providers.celery.executors.celery_executor_utils.execute_command[c7d43e18-44af-4524-b83d-237e6b4d6a39]
 raised unexpected: AirflowException('Celery command failed on host: 
mygkecluster-worker-2.mygkecluster-worker.mygkeclusterdev.svc.cluster.local 
with celery_task_id c7d43e18-44af-4524-b83d-237e6b4d6a39 (PID: 26675, Return 
Code: 256)')
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.10/site-packages/celery/app/trace.py", line 
453, in trace_task
       R = retval = fun(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/celery/app/trace.py", line 
736, in __protected_call__
       return self.run(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/celery/executors/celery_executor_utils.py",
 line 136, in execute_command
       _execute_in_fork(command_to_exec, celery_task_id)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/celery/executors/celery_executor_utils.py",
 line 151, in _execute_in_fork
       raise AirflowException(msg)
   airflow.exceptions.AirflowException: Celery command failed on host: 
mygkecluster-worker-2.mygkecluster-worker.mygkeclusterdev.svc.cluster.local 
with celery_task_id c7d43e18-44af-4524-b83d-237e6b4d6a39 (PID: 26675, Return 
Code: 256)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to