vizeit commented on issue #39717:
URL: https://github.com/apache/airflow/issues/39717#issuecomment-2211493420
> I did few tests with new version 2.9.2 and have the following details with
the log
>
> **Configuration**
>
> > Airflow version: 2.9.2
> > Compute: GKE
> > Executor: CeleryKubernetesExecutor
> > AIRFLOW__CORE__PARALLELISM: 160
> > AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY: 0
> > AIRFLOW__CELERY__WORKER_CONCURRENCY: 60
> > Worker replicas: 4
> > Scheduler replicas: 2
> > AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT: 3600
>
> I am running multiple instances of a dag with dynamic task mapping that
expands into hundreds of tasks. The log shows that task gets scheduled and
queued (at 2024-07-05T00:01:59.683) but does not get executed within task
queued timeout period resulting into the reported error (at
2024-07-05T01:02:09.431)
>
> ```
> {
> "textPayload": "\t<TaskInstance: dynamic-map-group.supplier.agent
manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [scheduled]>\u001b[0m",
> "insertId": "5fqi9x47wvvla4jh",
> "resource": {
> "type": "k8s_container",
> "labels": {
> "container_name": "airflow-scheduler",
> "namespace_name": "mynamespacedevdev",
> "location": "us-central1",
> "project_id": "mygcp-project",
> "cluster_name": "mygkecluster",
> "pod_name": "mygkecluster-scheduler-6b77fc67d-8z6gs"
> }
> },
> "timestamp": "2024-07-05T00:01:50.234024733Z",
> "severity": "INFO",
> "labels": {
> "k8s-pod/release": "mygkecluster",
> "k8s-pod/component": "scheduler",
> "k8s-pod/pod-template-hash": "6b77fc67d",
> "compute.googleapis.com/resource_name":
"gk3-mygkecluster-nap-c58g4osx-19a59e8c-pxv6",
> "k8s-pod/app": "airflow"
> },
> "logName": "projects/mygcp-project/logs/stdout",
> "receiveTimestamp": "2024-07-05T00:01:52.886420247Z"
> }
>
> {
> "textPayload": "[2024-07-05 00:01:59,683: INFO/ForkPoolWorker-49]
Running <TaskInstance: dynamic-map-group.supplier.agent
manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [queued]> on host
mygkecluster-worker-1.mygkecluster-worker.mynamespacedevdev.svc.cluster.local",
> "insertId": "4o989t3huassb59y",
> "resource": {
> "type": "k8s_container",
> "labels": {
> "container_name": "airflow-worker",
> "project_id": "mygcp-project",
> "location": "us-central1",
> "pod_name": "mygkecluster-worker-1",
> "namespace_name": "mynamespacedevdev",
> "cluster_name": "mygkecluster"
> }
> },
> "timestamp": "2024-07-05T00:01:59.683635856Z",
> "severity": "INFO",
> "labels": {
> "k8s-pod/apps_kubernetes_io/pod-index": "1",
> "k8s-pod/release": "mygkecluster",
> "k8s-pod/component": "worker",
> "k8s-pod/app": "airflow",
> "k8s-pod/statefulset_kubernetes_io/pod-name": "mygkecluster-worker-1",
> "k8s-pod/controller-revision-hash": "mygkecluster-worker-87b575989",
> "compute.googleapis.com/resource_name":
"gk3-mygkecluster-nap-c58g4osx-19a59e8c-kdqm"
> },
> "logName": "projects/mygcp-project/logs/stderr",
> "receiveTimestamp": "2024-07-05T00:02:03.578550371Z"
> }
>
> {
> "textPayload": "[\u001b[34m2024-07-05T01:01:58.407+0000\u001b[0m]
{\u001b[34mtask_context_logger.py:\u001b[0m91} WARNING\u001b[0m - Marking task
instance <TaskInstance: dynamic-map-group.supplier.agent
manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [queued]> stuck in
queued as failed. If the task instance has available retries, it will be
retried.\u001b[0m",
> "insertId": "11k0z0jmz77mlcu6",
> "resource": {
> "type": "k8s_container",
> "labels": {
> "container_name": "airflow-scheduler",
> "project_id": "mygcp-project",
> "namespace_name": "mynamespacedevdev",
> "cluster_name": "mygkecluster",
> "location": "us-central1",
> "pod_name": "mygkecluster-scheduler-6b77fc67d-8z6gs"
> }
> },
> "timestamp": "2024-07-05T01:01:58.409116538Z",
> "severity": "INFO",
> "labels": {
> "k8s-pod/release": "mygkecluster",
> "k8s-pod/app": "airflow",
> "k8s-pod/component": "scheduler",
> "compute.googleapis.com/resource_name":
"gk3-mygkecluster-nap-c58g4osx-19a59e8c-pxv6",
> "k8s-pod/pod-template-hash": "6b77fc67d"
> },
> "logName": "projects/mygcp-project/logs/stdout",
> "receiveTimestamp": "2024-07-05T01:02:02.907406580Z"
> }
>
> {
> "textPayload": "[\u001b[34m2024-07-05T01:02:09.431+0000\u001b[0m]
{\u001b[34mtask_context_logger.py:\u001b[0m91} ERROR\u001b[0m - Executor
reports task instance <TaskInstance: dynamic-map-group.supplier.agent
manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [queued]> finished
(failed) although the task says it's queued. (Info: None) Was the task killed
externally?\u001b[0m",
> "insertId": "pldcpv17g4ggyycu",
> "resource": {
> "type": "k8s_container",
> "labels": {
> "project_id": "mygcp-project",
> "location": "us-central1",
> "pod_name": "mygkecluster-scheduler-6b77fc67d-8z6gs",
> "cluster_name": "mygkecluster",
> "container_name": "airflow-scheduler",
> "namespace_name": "mynamespacedevdev"
> }
> },
> "timestamp": "2024-07-05T01:02:09.431825344Z",
> "severity": "INFO",
> "labels": {
> "k8s-pod/app": "airflow",
> "compute.googleapis.com/resource_name":
"gk3-mygkecluster-nap-c58g4osx-19a59e8c-pxv6",
> "k8s-pod/component": "scheduler",
> "k8s-pod/release": "mygkecluster",
> "k8s-pod/pod-template-hash": "6b77fc67d"
> },
> "logName": "projects/mygcp-project/logs/stdout",
> "receiveTimestamp": "2024-07-05T01:02:12.928364383Z"
> }
> ```
>
> I believe increasing **AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT** value
might fix the issue. However, the root cause is still unknown, why did worker
not process the task for an entire hour? Interesting observation is the failed
task with this error is from the first instance of the DAG e.g. If I have 8
instances of the dag running, the error shows up in the 1st instance. Which
indicates that tasks from all the instances are running and processed but
somehow-sometime certain (not all) task from the 1st instance never gets
executed. It may have to do with the overall throughput so tasks never stay in
the queue for that long and I am not sure yet how to increase it. I have enough
CPU and memory for the worker replicas as well as scheduler. Any ideas?
I take back my previous comment, the issue occurs without reaching task
queued timeout. I did few more tests and also turned on debug log level. Please
find the log from worker
```
[2024-07-05 21:34:55,178: INFO/MainProcess] Task
airflow.providers.celery.executors.celery_executor_utils.execute_command[c7d43e18-44af-4524-b83d-237e6b4d6a39]
received
[2024-07-05 21:34:55,178: DEBUG/MainProcess] TaskPool: Apply <function
fast_trace_task at 0x7c4547b2bb50>
(args:('airflow.providers.celery.executors.celery_executor_utils.execute_command',
'c7d43e18-44af-4524-b83d-237e6b4d6a39', {'lang': 'py', 'task':
'airflow.providers.celery.executors.celery_executor_utils.execute_command',
'id': 'c7d43e18-44af-4524-b83d-237e6b4d6a39', 'shadow': None, 'eta': None,
'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit':
[None, None], 'root_id': 'c7d43e18-44af-4524-b83d-237e6b4d6a39', 'parent_id':
None, 'argsrepr': "[['airflow', 'tasks', 'run', 'dynamic-map', 'consumer',
'manual__2024-07-05T21:23:23.580900+00:00', '--local', '--subdir',
'DAGS_FOLDER/dynamic_task_mapping.py', '--map-index', '243']]", 'kwargsrepr':
'{}', 'origin': 'gen9371@mygkecluster-scheduler-545bb869c4-td9p7',
'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None,
'stamps': {}, 'properties': {'correlation_id': 'c7d43e18-44af-4524-b83
d-237e6b4d6a39', 'reply_to': '941f6db4-8ed8-3a87-b084-d800ec5eec6f',
'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key':...
kwargs:{})
[2024-07-05 21:34:55,180: DEBUG/MainProcess] Task accepted:
airflow.providers.celery.executors.celery_executor_utils.execute_command[c7d43e18-44af-4524-b83d-237e6b4d6a39]
pid:98
[2024-07-05 21:34:55,201: INFO/ForkPoolWorker-31]
[c7d43e18-44af-4524-b83d-237e6b4d6a39] Executing command in Celery: ['airflow',
'tasks', 'run', 'dynamic-map', 'consumer',
'manual__2024-07-05T21:23:23.580900+00:00', '--local', '--subdir',
'DAGS_FOLDER/dynamic_task_mapping.py', '--map-index', '243']
[2024-07-05 21:34:55,210: DEBUG/ForkPoolWorker-31] calling func 'task_run'
with args Namespace(subcommand='run', dag_id='dynamic-map', task_id='consumer',
execution_date_or_run_id='manual__2024-07-05T21:23:23.580900+00:00',
cfg_path=None, depends_on_past='check', force=False,
ignore_all_dependencies=False, ignore_dependencies=False,
ignore_depends_on_past=False, interactive=False, job_id=None, local=True,
map_index=243, mark_success=False, shut_down_logging=False, pickle=None,
pool=None, raw=False, read_from_db=False, ship_dag=False,
subdir='DAGS_FOLDER/dynamic_task_mapping.py', verbose=False, func=<function
lazy_load_command.<locals>.command at 0x7c454c769480>,
external_executor_id='c7d43e18-44af-4524-b83d-237e6b4d6a39')
[2024-07-05 21:35:48,192: ERROR/ForkPoolWorker-31]
[c7d43e18-44af-4524-b83d-237e6b4d6a39] Failed to execute task.
[2024-07-05 21:35:48,547: ERROR/ForkPoolWorker-31] Task
airflow.providers.celery.executors.celery_executor_utils.execute_command[c7d43e18-44af-4524-b83d-237e6b4d6a39]
raised unexpected: AirflowException('Celery command failed on host:
mygkecluster-worker-2.mygkecluster-worker.mygkeclusterdev.svc.cluster.local
with celery_task_id c7d43e18-44af-4524-b83d-237e6b4d6a39 (PID: 26675, Return
Code: 256)')
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.10/site-packages/celery/app/trace.py", line
453, in trace_task
R = retval = fun(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/celery/app/trace.py", line
736, in __protected_call__
return self.run(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/celery/executors/celery_executor_utils.py",
line 136, in execute_command
_execute_in_fork(command_to_exec, celery_task_id)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/celery/executors/celery_executor_utils.py",
line 151, in _execute_in_fork
raise AirflowException(msg)
airflow.exceptions.AirflowException: Celery command failed on host:
mygkecluster-worker-2.mygkecluster-worker.mygkeclusterdev.svc.cluster.local
with celery_task_id c7d43e18-44af-4524-b83d-237e6b4d6a39 (PID: 26675, Return
Code: 256)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]