This is an automated email from the ASF dual-hosted git repository.
o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 66a1a643154 Fix callback state not updating from executor events due
to UUID type mismatch (#67542)
66a1a643154 is described below
commit 66a1a6431542e73d013e51a2c3476bdbfd25294d
Author: Jeongwoo Do <[email protected]>
AuthorDate: Wed May 27 03:10:55 2026 +0900
Fix callback state not updating from executor events due to UUID type
mismatch (#67542)
* Fix callback state not updating from executor events due to UUID type
mismatch
* fix logic
---
airflow-core/src/airflow/jobs/scheduler_job_runner.py | 3 ++-
airflow-core/tests/unit/models/test_callback.py | 14 ++++++++++++++
2 files changed, 16 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 6acd92fbe92..224659c4c4d 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -32,6 +32,7 @@ from datetime import date, datetime, timedelta
from functools import lru_cache, partial
from itertools import groupby
from typing import TYPE_CHECKING, Any, cast
+from uuid import UUID
from sqlalchemy import (
CTE,
@@ -1270,7 +1271,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# Handle callback state events
for callback_id in callback_keys_with_events:
state, info = event_buffer.pop(callback_id)
- callback = session.get(Callback, str(callback_id))
+ callback = session.get(Callback, UUID(str(callback_id)))
if not callback:
# This should not normally happen - we just received an event
for this callback.
# Only possible if callback was deleted mid-execution (e.g.,
cascade delete from DagRun deletion).
diff --git a/airflow-core/tests/unit/models/test_callback.py
b/airflow-core/tests/unit/models/test_callback.py
index 5903f541854..c31d1c99b49 100644
--- a/airflow-core/tests/unit/models/test_callback.py
+++ b/airflow-core/tests/unit/models/test_callback.py
@@ -237,6 +237,20 @@ class TestExecutorCallback:
callback.queue()
assert callback.state == CallbackState.QUEUED
+ def test_session_get_requires_uuid_not_str(self, session):
+ """Filtering the UUID id column with a plain str breaks on SQLite, so
+ callers must wrap with ``UUID(...)`` before querying."""
+ from uuid import UUID
+
+ callback = ExecutorCallback(TEST_SYNC_CALLBACK,
fetch_method=CallbackFetchMethod.IMPORT_PATH)
+ session.add(callback)
+ session.commit()
+ # ``id`` is filled by the ``uuid6.uuid7`` default at flush time, so it
+ # is only safe to stringify *after* the commit.
+ callback_id_str = str(callback.id)
+
+ assert session.get(Callback, UUID(callback_id_str)) is not None
+
class TestDagProcessorCallback:
def test_polymorphic_serde(self, session):