GitHub user benbenbang edited a discussion: Airflow 3.1.5 Mixed Executors -
Task Instance Not Found & State Desync Issues
Hi folks 👋,
We're experiencing critical issues with Airflow `3.1.5` using mixed executors
(`KubernetesExecutor`,
`CeleryExecutor`) that prevent `CeleryExecutor` tasks from running
successfully. This is blocking our
migration from Airflow `2.x` to `3.x`.
## Environment
- Airflow Version: `3.1.5`
- Executor Config: `AIRFLOW__CORE__EXECUTOR=KubernetesExecutor,CeleryExecutor`
- Deployment: `Kubernetes (EKS)`
- Celery Broker: `AWS SQS (FIFO queues)`
- Celery Result Backend: `PostgreSQL`
- Database: `PostgreSQL (RDS)`
- Usages:
- `CeleryExecutor`: `dbt` jobs (high-concurrency, short-duration,
SLA-critical)
- `KubernetesExecutor`: Resource-intensive jobs requiring custom pod specs
- ConfigMap:
```yaml
AIRFLOW__CELERY__BROKER_URL=sqs://
AIRFLOW__CELERY__OPERATION_TIMEOUT=30
AIRFLOW__CELERY__FLOWER_PORT=5555
AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE=kubernetes
AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS=celery_config.CELERY_CONFIG
AIRFLOW__CELERY__SSL_ACTIVE=false
AIRFLOW__CELERY__FLOWER_HOST=0.0.0.0
AIRFLOW__CELERY__SYNC_PARALLELISM=0
```
- Celery Config:
```json
{
"accept_content": [
"json"
],
"event_serializer": "json",
"worker_prefetch_multiplier": 1,
"task_acks_late": true,
"task_default_queue": "default",
"task_default_exchange": "default",
"task_track_started": true,
"broker_url": "sqs://",
"broker_transport_options": {
"visibility_timeout": 32000,
"region_name": "eu-west-1",
"region": "eu-west-1",
"predefined_queues": {
"vc-sqs-celery-worker.fifo": {
"url":
"https://sqs.eu-west-1.amazonaws.com/xxx/vc-sqs-celery-worker.fifo"
}
}
},
"broker_connection_retry_on_startup": true,
"result_backend": "postgresql",
"database_engine_options": {},
"worker_concurrency": 16,
"worker_enable_remote_control": true,
"worker_redirect_stdouts": false,
"worker_hijack_root_logger": false
}
```
## Configuration
DAG-level executor specification:
```yaml
# config.yaml
dag:
default_args:
executor: CeleryExecutor
pool: pool_dbt_hourly_p0
retries: 2
start_date: "2020-11-30 00:00:00"
```
Celery queue configuration:
```python
# celery_config.py
# merge with from airflow.providers.celery.executors.default_celery import
DEFAULT_CELERY_CONFIG
# becosmes CELERY_CONFIG
EXTRA_CELERY_CONFIG = {
"broker_transport_options": {
"visibility_timeout": 32000,
"region_name": "eu-west-1",
"predefined_queues": {
f"vc-sqs-celery-worker.fifo": {
"url":
f"https://sqs.eu-west-1.amazonaws.com/xxx/vc-sqs-celery-worker.fifo",
},
},
},
}
```
Queue passed to tasks:
```python
# DAG code
queue = dag_config.DEFAULT_CELERY_QUEUE # "vc-sqs-celery-worker.fifo"
task = DbtBaseOperator(
task_id="run_model",
queue=queue,
# ... other params
)
```
### Evidence of correct configuration (by importing dag and checking in the
pod):
```python
In [1]: dag.task_dict
Out[1]:
{'start': <Task(EmptyOperator): start>,
'run_product': <Task(DbtCosmosBaseOperator): run_product>,
'test_product': <Task(DbtCosmosBaseOperator): test_product>,
'snapshot_product': <Task(DbtCosmosBaseOperator): snapshot_product>,
'end': <Task(EmptyOperator): end>}
In [2]: dag.task_dict["run_product"].queue
Out[2]: 'vc-sqs-celery-worker.fifo'
In [3]: dag.task_dict["run_product"].executor
Out[3]: 'CeleryExecutor'
```
Tasks are correctly configured with both `executor` and `queue` parameters.
## The Issues
### Issue 1: Task Instance Not Found (CRITICAL - Blocks all CeleryExecutor
tasks)
**Tasks with `executor: CeleryExecutor` consistently fail with "Task Instance
not found" errors,
preventing any task execution.**
Celery Worker Logs:
```
2026-02-04T15:17:15.096Z [info] [9e301d75...] Executing workload in Celery:
ti=TaskInstance(id=UUID('019c293a-994b-7abe-839f-4bcb7c1b2e1f'),
task_id='run_product', dag_id='dbt_hourly_p0_product',
run_id='scheduled__2026-02-04T14:30:00+00:00', try_number=4,
queue='vc-sqs-celery-worker.fifo', ...)
2026-02-04T15:17:15.565Z [info] Secrets backends loaded for worker
[15-30 seconds of DAG code loading, Variable API calls]
2026-02-04T15:17:45.678Z [info] Process exited exit_code=-9 signal_sent=SIGKILL
2026-02-04T15:17:45.692Z [error] Task execute_workload[9e301d75...] raised
unexpected:
ServerResponseError('Server returned error')
Traceback:
File "airflow/sdk/execution_time/supervisor.py", line 966, in
_on_child_started
ti_context = self.client.task_instances.start(ti.id, self.pid, start_date)
airflow.sdk.api.client.ServerResponseError: Server returned error
Correlation-id=019c293b-1c2a-7cdd-9b47-294f34d0a7c5
2026-02-04T15:17:48.482Z [error] Server indicated the task shouldn't be running
anymore
[supervisor] detail={'detail': {'reason': 'not_found',
'message': 'Task Instance not found'}} status_code=404
```
Scheduler Logs:
```
2026-02-04T15:33:17.559Z [error] Executor CeleryExecutor(parallelism=32)
reported
that the task instance <TaskInstance: dbt_hourly_p0_product.run_product
scheduled__2026-02-04T14:30:00+00:00 [running]> finished with state failed,
but the task instance's state attribute is running.
Learn more:
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-stat
e-changed-externally
```
Timeline:
- Task instance created by scheduler
- Sent to Celery queue immediately
- Worker receives and starts execution
- 15-30 seconds later: Worker subprocess killed with `SIGKILL`
- API server reports task instance doesn't exist (`404`)
- Scheduler state: `running`, Executor state: `failed` → state desync
### Issue 2: TaskFlow Tasks Default to `queue='default'`
TaskFlow API tasks (`@task` decorator) don't inherit the `queue` parameter from
DAG code, defaulting
to `queue='default'`:
```
2026-02-04T16:35:02.311Z [error] Error sending Celery task:
Queue with name 'default' must be defined in 'predefined_queues'.
Celery Task ID:
TaskInstanceKey(dag_id='dbt_snowplow_p0_via_pkgs_cosmos_rotation',
task_id='pass_wats', run_id='scheduled__2026-02-04T16:30:00+00:00', ...)
Traceback:
File "kombu/transport/SQS.py", line 390, in _resolve_queue_url
return self._queue_cache[sqs_qname]
KeyError: 'default'
Task definition:
@dag(**dag_kwargs) # dag_kwargs['default_args'] has executor: CeleryExecutor
def my_dag():
@task(task_id="pass_wats")
def pass_wats(**context):
return some_value
```
Even though the DAG code sets:
```python
queue = dag_config.DEFAULT_CELERY_QUEUE # "vc-sqs-celery-worker.fifo"
```
And passes it to explicit operators:
```python
DbtBaseOperator(task_id="run_model", queue=queue) # ✅ Works
```
TaskFlow tasks created with `@task` decorator don't inherit this and default to
`queue='default'`.
## What Works
- ✅ `KubernetesExecutor` tasks work perfectly (no issues)
- ✅ Airflow 2.x with `CeleryExecutor` worked flawlessly
- ✅ Explicit operator tasks with `queue=queue` parameter work correctly
- ✅ Tasks correctly show `executor='CeleryExecutor'` and
`queue='vc-sqs-celery-worker.fifo'` when inspected
## Questions for the Community
1. Why does the API server return `404` "Task Instance not found" when the
worker tries to start a
task that the scheduler created seconds earlier? Is there a synchronization
issue between
scheduler, API server, and executors?
2. Is the per-task `executor=` parameter fully supported in Airflow 3.x mixed
executor setups? The
documentation suggests it is (since 2.6+), but we're experiencing critical
failures.
3. For TaskFlow tasks (`@task` decorator) with mixed executors:
- Should `queue` be specified in `default_args`?
- Or is there a different mechanism for TaskFlow tasks?
- Why doesn't the `queue` parameter set in DAG code propagate to TaskFlow
tasks?
4. Is this a known issue being tracked? We found similar reports on Slack
(users experiencing
identical symptoms with 3.1.5 mixed executors) but no GitHub issues.
5. State desync between executor and scheduler - Is this expected behavior or a
bug? The executor
reports `"failed"` but the task instance remains in `"running"` state.
6. Are there recommended workarounds for 3.x mixed executors, or should we
avoid this
configuration entirely until fixed?
## What We've Tried
1. Increased `AIRFLOW__CELERY__OPERATION_TIMEOUT` from `30s` to `300s`
2. Verified scheduler and API server use identical database connections
3. Confirmed Celery queues are properly configured in `predefined_queues`
4. Verified tasks have correct `executor` and `queue` attributes when inspected
5. Setting `AIRFLOW__CELERY__DEFAULT_QUEUE` - doesn't resolve TaskFlow task
routing
6. All `CeleryExecutor` tasks still fail with "Task Instance not found"
GitHub link: https://github.com/apache/airflow/discussions/61468
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]