GitHub user benbenbang created a discussion: Airflow 3.1.5 Mixed Executors - 
Task Instance Not Found & State Desync Issues

Hi folks 👋,

We're experiencing critical issues with Airflow `3.1.5` using mixed executors 
(`KubernetesExecutor`,
`CeleryExecutor`) that prevent `CeleryExecutor` tasks from running 
successfully. This is blocking our
migration from Airflow `2.x` to `3.x`.

## Environment

- Airflow Version: `3.1.5`
- Executor Config: `AIRFLOW__CORE__EXECUTOR=KubernetesExecutor,CeleryExecutor`
- Deployment: `Kubernetes (EKS)`
- Celery Broker: `AWS SQS (FIFO queues)`
- Celery Result Backend: `PostgreSQL`
- Database: `PostgreSQL (RDS)`
- Usages:
    - `CeleryExecutor`: `dbt` jobs (high-concurrency, short-duration, 
SLA-critical)
    - `KubernetesExecutor`: Resource-intensive jobs requiring custom pod specs
- ConfigMap:
   ```yaml
    AIRFLOW__CELERY__BROKER_URL=sqs://
    AIRFLOW__CELERY__OPERATION_TIMEOUT=30
    AIRFLOW__CELERY__FLOWER_PORT=5555
    AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE=kubernetes
    AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS=celery_config.CELERY_CONFIG
    AIRFLOW__CELERY__SSL_ACTIVE=false
    AIRFLOW__CELERY__FLOWER_HOST=0.0.0.0
    AIRFLOW__CELERY__SYNC_PARALLELISM=0
   ```
- Celery Config:
  ```json
  {
    "accept_content": [
        "json"
    ],
    "event_serializer": "json",
    "worker_prefetch_multiplier": 1,
    "task_acks_late": true,
    "task_default_queue": "default",
    "task_default_exchange": "default",
    "task_track_started": true,
    "broker_url": "sqs://",
    "broker_transport_options": {
        "visibility_timeout": 32000,
        "region_name": "eu-west-1",
        "region": "eu-west-1",
        "predefined_queues": {
            "vc-sqs-celery-worker.fifo": {
                "url": 
"https://sqs.eu-west-1.amazonaws.com/xxx/vc-sqs-celery-worker.fifo";
            }
        }
    },
    "broker_connection_retry_on_startup": true,
    "result_backend": "postgresql",
    "database_engine_options": {},
    "worker_concurrency": 16,
    "worker_enable_remote_control": true,
    "worker_redirect_stdouts": false,
    "worker_hijack_root_logger": false
  }
  ```


## Configuration

DAG-level executor specification:
```yaml
# config.yaml
dag:
  default_args:
    executor: CeleryExecutor
    pool: pool_dbt_hourly_p0
    retries: 2
    start_date: "2020-11-30 00:00:00"
```

Celery queue configuration:
```python
# celery_config.py
# merge with from airflow.providers.celery.executors.default_celery import 
DEFAULT_CELERY_CONFIG
# becosmes CELERY_CONFIG
 
EXTRA_CELERY_CONFIG = {
    "broker_transport_options": {
        "visibility_timeout": 32000,
        "region_name": "eu-west-1",
        "predefined_queues": {
            f"vc-sqs-celery-worker.fifo": {
                    "url":
f"https://sqs.eu-west-1.amazonaws.com/xxx/vc-sqs-celery-worker.fifo";,
                },
            },
        },
    }
```

Queue passed to tasks:
```python
# DAG code
queue = dag_config.DEFAULT_CELERY_QUEUE  # "vc-sqs-celery-worker.fifo"
task = DbtBaseOperator(
    task_id="run_model",
    queue=queue,
    # ... other params
)
```

### Evidence of correct configuration (by importing dag and checking in the 
pod):
```python
In [1]: dag.task_dict
Out[1]:
{'start': <Task(EmptyOperator): start>,
 'run_product': <Task(DbtCosmosBaseOperator): run_product>,
 'test_product': <Task(DbtCosmosBaseOperator): test_product>,
 'snapshot_product': <Task(DbtCosmosBaseOperator): snapshot_product>,
 'end': <Task(EmptyOperator): end>}

In [2]: dag.task_dict["run_product"].queue
Out[2]: 'vc-sqs-celery-worker.fifo'

In [3]: dag.task_dict["run_product"].executor
Out[3]: 'CeleryExecutor'
```

Tasks are correctly configured with both `executor` and `queue` parameters.

## The Issues

### Issue 1: Task Instance Not Found (CRITICAL - Blocks all CeleryExecutor 
tasks)

**Tasks with `executor: CeleryExecutor` consistently fail with "Task Instance 
not found" errors,
preventing any task execution.**

Celery Worker Logs:
```
2026-02-04T15:17:15.096Z [info] [9e301d75...] Executing workload in Celery:
  ti=TaskInstance(id=UUID('019c293a-994b-7abe-839f-4bcb7c1b2e1f'),
  task_id='run_product', dag_id='dbt_hourly_p0_product',
  run_id='scheduled__2026-02-04T14:30:00+00:00', try_number=4,
  queue='vc-sqs-celery-worker.fifo', ...)

2026-02-04T15:17:15.565Z [info] Secrets backends loaded for worker

[15-30 seconds of DAG code loading, Variable API calls]

2026-02-04T15:17:45.678Z [info] Process exited exit_code=-9 signal_sent=SIGKILL

2026-02-04T15:17:45.692Z [error] Task execute_workload[9e301d75...] raised 
unexpected:
  ServerResponseError('Server returned error')

  Traceback:
    File "airflow/sdk/execution_time/supervisor.py", line 966, in 
_on_child_started
      ti_context = self.client.task_instances.start(ti.id, self.pid, start_date)
  airflow.sdk.api.client.ServerResponseError: Server returned error
  Correlation-id=019c293b-1c2a-7cdd-9b47-294f34d0a7c5

2026-02-04T15:17:48.482Z [error] Server indicated the task shouldn't be running 
anymore
  [supervisor] detail={'detail': {'reason': 'not_found',
  'message': 'Task Instance not found'}} status_code=404
```

Scheduler Logs:
```
2026-02-04T15:33:17.559Z [error] Executor CeleryExecutor(parallelism=32) 
reported
  that the task instance <TaskInstance: dbt_hourly_p0_product.run_product
  scheduled__2026-02-04T14:30:00+00:00 [running]> finished with state failed,
  but the task instance's state attribute is running.

  Learn more: 
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-stat
e-changed-externally
```

Timeline:
- Task instance created by scheduler
- Sent to Celery queue immediately
- Worker receives and starts execution
- 15-30 seconds later: Worker subprocess killed with `SIGKILL`
- API server reports task instance doesn't exist (`404`)
- Scheduler state: `running`, Executor state: `failed` → state desync

### Issue 2: TaskFlow Tasks Default to `queue='default'`

TaskFlow API tasks (`@task` decorator) don't inherit the `queue` parameter from 
DAG code, defaulting
to `queue='default'`:

```
2026-02-04T16:35:02.311Z [error] Error sending Celery task:
  Queue with name 'default' must be defined in 'predefined_queues'.

Celery Task ID: 
TaskInstanceKey(dag_id='dbt_snowplow_p0_via_pkgs_cosmos_rotation',
  task_id='pass_wats', run_id='scheduled__2026-02-04T16:30:00+00:00', ...)

Traceback:
  File "kombu/transport/SQS.py", line 390, in _resolve_queue_url
    return self._queue_cache[sqs_qname]
KeyError: 'default'

Task definition:
@dag(**dag_kwargs)  # dag_kwargs['default_args'] has executor: CeleryExecutor
def my_dag():
    @task(task_id="pass_wats")
    def pass_wats(**context):
        return some_value
```

Even though the DAG code sets:
```python
queue = dag_config.DEFAULT_CELERY_QUEUE  # "vc-sqs-celery-worker.fifo"
```
And passes it to explicit operators:
```python
DbtBaseOperator(task_id="run_model", queue=queue)  # ✅ Works
```

TaskFlow tasks created with `@task` decorator don't inherit this and default to 
`queue='default'`.

## What Works

- ✅ `KubernetesExecutor` tasks work perfectly (no issues)
- ✅ Airflow 2.x with `CeleryExecutor` worked flawlessly
- ✅ Explicit operator tasks with `queue=queue` parameter work correctly
- ✅ Tasks correctly show `executor='CeleryExecutor'` and 
`queue='vc-sqs-celery-worker.fifo'` when inspected

## Questions for the Community

1. Why does the API server return `404` "Task Instance not found" when the 
worker tries to start a
task that the scheduler created seconds earlier? Is there a synchronization 
issue between
scheduler, API server, and executors?
2. Is the per-task `executor=` parameter fully supported in Airflow 3.x mixed 
executor setups? The
documentation suggests it is (since 2.6+), but we're experiencing critical 
failures.
3. For TaskFlow tasks (`@task` decorator) with mixed executors:
  - Should `queue` be specified in `default_args`?
  - Or is there a different mechanism for TaskFlow tasks?
  - Why doesn't the `queue` parameter set in DAG code propagate to TaskFlow 
tasks?
4. Is this a known issue being tracked? We found similar reports on Slack 
(users experiencing
identical symptoms with 3.1.5 mixed executors) but no GitHub issues.
5. State desync between executor and scheduler - Is this expected behavior or a 
bug? The executor
reports `"failed"` but the task instance remains in `"running"` state.
6. Are there recommended workarounds for 3.x mixed executors, or should we 
avoid this
configuration entirely until fixed?

## What We've Tried

1.  Increased `AIRFLOW__CELERY__OPERATION_TIMEOUT` from `30s` to `300s` 
2.  Verified scheduler and API server use identical database connections
3.  Confirmed Celery queues are properly configured in `predefined_queues`
4.  Verified tasks have correct `executor` and `queue` attributes when inspected
5.  Setting `AIRFLOW__CELERY__DEFAULT_QUEUE` - doesn't resolve TaskFlow task 
routing
6.  All `CeleryExecutor` tasks still fail with "Task Instance not found"

## Impact

This is blocking our Airflow 3.x migration. We cannot meet our SLAs without 
`CeleryExecutor` for
high-concurrency dbt workloads. Reverting to Airflow 2.x is not a long-term 
option.

Any guidance, insights, or pointers to related issues would be greatly 
appreciated! Happy to
provide additional logs, configuration details, or help test patches.


GitHub link: https://github.com/apache/airflow/discussions/61468

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to