GitHub user matthewblock edited a discussion: Getting value of Airflow variable within DeadlineAlert callback?
I'm doing some exploration of migrating my team from Airflow 2 to 3, and I'm trying to convert our SLA callback code to use the new DeadlineAlert (which seems much better so far!) We store our alert configurations in Airflow variables because we have several test environments and a production environment, and we need different settings for each (e.g. which webhook to use, whether it's enabled, etc.) In our SLA miss callback, we could use `Variable.get()` to fetch our config, but in a DeadlineAlert callback, I'm running into issues because of `async`. When I use `Variable.get()` within the callback, the error is ```You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.``` I believe this happens because there is some async-sync bridging going on when Airflow queries the meta database. To solve this, I tried using `asyncio.to_thread(Variable.get, 'my_var')` but then I get ```RuntimeError: Response read out of order! Got frame.id=16, expect_id=17```. I'm guessing I'm running into issues with the event loop. I tried locking the thread but couldn't get that to work either. I also tried passing the variables to `dag.deadline` via kwargs using jinja templates in the `AsyncCallback` class, but [in the docs](https://airflow.apache.org/docs/apache-airflow/stable/howto/deadline-alerts.html#templating-and-context) it says only variables in context get rendered, so this didn't work: ``` with DAG( ... deadline=DeadlineAlert( reference=DeadlineReference.DAGRUN_LOGICAL_DATE, interval=timedelta(seconds=10), callback=AsyncCallback( notify_teams_on_deadline_alert, kwargs={"my_var": "{{ var.value.my_var }}"} # Doesn't get rendered ), ), ... ) as dag: ``` I'm a bit out of my depth with asyncio at this point. I know that `SyncCallback` is planned for a [future release](https://github.com/apache/airflow/blob/9ef4f12a5bcad1f7940d0f3fa8238a2ba4096114/task-sdk/src/airflow/sdk/definitions/deadline.py#L60), but has anyone gotten this to work with `AsyncCallback`? Is there something obvious I'm supposed to be doing? GitHub link: https://github.com/apache/airflow/discussions/56781 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
