vizeit commented on issue #39717:
URL: https://github.com/apache/airflow/issues/39717#issuecomment-2210232607
I did few tests with new version 2.9.2 and have the following details with
the log
**Configuration**
> Airflow version: 2.9.2
> Compute: GKE
> Executor: CeleryKubernetesExecutor
> AIRFLOW__CORE__PARALLELISM: 160
> AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY: 0
> AIRFLOW__CELERY__WORKER_CONCURRENCY: 60
> Worker replicas: 4
> Scheduler replicas: 2
> AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT: 3600
I am running multiple instances of a dag with dynamic task mapping that
expands into hundreds of tasks. The log shows that task gets scheduled and
queued (at 2024-07-05T00:01:59.683) but does not get executed within task
queued timeout period resulting into the reported error (at
2024-07-05T01:02:09.431)
````
{
"textPayload": "\t<TaskInstance: dynamic-map-group.supplier.agent
manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [scheduled]>\u001b[0m",
"insertId": "5fqi9x47wvvla4jh",
"resource": {
"type": "k8s_container",
"labels": {
"container_name": "airflow-scheduler",
"namespace_name": "mynamespacedevdev",
"location": "us-central1",
"project_id": "mygcp-project",
"cluster_name": "mygkecluster",
"pod_name": "mygkecluster-scheduler-6b77fc67d-8z6gs"
}
},
"timestamp": "2024-07-05T00:01:50.234024733Z",
"severity": "INFO",
"labels": {
"k8s-pod/release": "mygkecluster",
"k8s-pod/component": "scheduler",
"k8s-pod/pod-template-hash": "6b77fc67d",
"compute.googleapis.com/resource_name":
"gk3-mygkecluster-nap-c58g4osx-19a59e8c-pxv6",
"k8s-pod/app": "airflow"
},
"logName": "projects/mygcp-project/logs/stdout",
"receiveTimestamp": "2024-07-05T00:01:52.886420247Z"
}
{
"textPayload": "[2024-07-05 00:01:59,683: INFO/ForkPoolWorker-49] Running
<TaskInstance: dynamic-map-group.supplier.agent
manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [queued]> on host
mygkecluster-worker-1.mygkecluster-worker.mynamespacedevdev.svc.cluster.local",
"insertId": "4o989t3huassb59y",
"resource": {
"type": "k8s_container",
"labels": {
"container_name": "airflow-worker",
"project_id": "mygcp-project",
"location": "us-central1",
"pod_name": "mygkecluster-worker-1",
"namespace_name": "mynamespacedevdev",
"cluster_name": "mygkecluster"
}
},
"timestamp": "2024-07-05T00:01:59.683635856Z",
"severity": "INFO",
"labels": {
"k8s-pod/apps_kubernetes_io/pod-index": "1",
"k8s-pod/release": "mygkecluster",
"k8s-pod/component": "worker",
"k8s-pod/app": "airflow",
"k8s-pod/statefulset_kubernetes_io/pod-name": "mygkecluster-worker-1",
"k8s-pod/controller-revision-hash": "mygkecluster-worker-87b575989",
"compute.googleapis.com/resource_name":
"gk3-mygkecluster-nap-c58g4osx-19a59e8c-kdqm"
},
"logName": "projects/mygcp-project/logs/stderr",
"receiveTimestamp": "2024-07-05T00:02:03.578550371Z"
}
{
"textPayload": "[\u001b[34m2024-07-05T01:01:58.407+0000\u001b[0m]
{\u001b[34mtask_context_logger.py:\u001b[0m91} WARNING\u001b[0m - Marking task
instance <TaskInstance: dynamic-map-group.supplier.agent
manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [queued]> stuck in
queued as failed. If the task instance has available retries, it will be
retried.\u001b[0m",
"insertId": "11k0z0jmz77mlcu6",
"resource": {
"type": "k8s_container",
"labels": {
"container_name": "airflow-scheduler",
"project_id": "mygcp-project",
"namespace_name": "mynamespacedevdev",
"cluster_name": "mygkecluster",
"location": "us-central1",
"pod_name": "mygkecluster-scheduler-6b77fc67d-8z6gs"
}
},
"timestamp": "2024-07-05T01:01:58.409116538Z",
"severity": "INFO",
"labels": {
"k8s-pod/release": "mygkecluster",
"k8s-pod/app": "airflow",
"k8s-pod/component": "scheduler",
"compute.googleapis.com/resource_name":
"gk3-mygkecluster-nap-c58g4osx-19a59e8c-pxv6",
"k8s-pod/pod-template-hash": "6b77fc67d"
},
"logName": "projects/mygcp-project/logs/stdout",
"receiveTimestamp": "2024-07-05T01:02:02.907406580Z"
}
{
"textPayload": "[\u001b[34m2024-07-05T01:02:09.431+0000\u001b[0m]
{\u001b[34mtask_context_logger.py:\u001b[0m91} ERROR\u001b[0m - Executor
reports task instance <TaskInstance: dynamic-map-group.supplier.agent
manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [queued]> finished
(failed) although the task says it's queued. (Info: None) Was the task killed
externally?\u001b[0m",
"insertId": "pldcpv17g4ggyycu",
"resource": {
"type": "k8s_container",
"labels": {
"project_id": "mygcp-project",
"location": "us-central1",
"pod_name": "mygkecluster-scheduler-6b77fc67d-8z6gs",
"cluster_name": "mygkecluster",
"container_name": "airflow-scheduler",
"namespace_name": "mynamespacedevdev"
}
},
"timestamp": "2024-07-05T01:02:09.431825344Z",
"severity": "INFO",
"labels": {
"k8s-pod/app": "airflow",
"compute.googleapis.com/resource_name":
"gk3-mygkecluster-nap-c58g4osx-19a59e8c-pxv6",
"k8s-pod/component": "scheduler",
"k8s-pod/release": "mygkecluster",
"k8s-pod/pod-template-hash": "6b77fc67d"
},
"logName": "projects/mygcp-project/logs/stdout",
"receiveTimestamp": "2024-07-05T01:02:12.928364383Z"
}
````
I believe increasing **AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT** value will
fix the issue. However, the root cause is still unknown, why did worker not
process the task for an entire hour? Interesting observation is the failed task
with this error is from the first instance of the DAG e.g. If I have 8
instances of the dag running, the error shows up in the 1st instance. Which
indicates that tasks from all the instances are running and processed but
somehow-sometime task from the 1st instance never gets executed. It may have to
do with the overall throughput so tasks never stay in the queue for that long
and I am not sure yet how to increase it. I have enough CPU and memory for the
worker replicas as well as scheduler. Any ideas?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]