This is an automated email from the ASF dual-hosted git repository.

o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 66a1a643154 Fix callback state not updating from executor events due 
to UUID type mismatch (#67542)
66a1a643154 is described below

commit 66a1a6431542e73d013e51a2c3476bdbfd25294d
Author: Jeongwoo Do <[email protected]>
AuthorDate: Wed May 27 03:10:55 2026 +0900

    Fix callback state not updating from executor events due to UUID type 
mismatch (#67542)
    
    * Fix callback state not updating from executor events due to UUID type 
mismatch
    
    * fix logic
---
 airflow-core/src/airflow/jobs/scheduler_job_runner.py |  3 ++-
 airflow-core/tests/unit/models/test_callback.py       | 14 ++++++++++++++
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 6acd92fbe92..224659c4c4d 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -32,6 +32,7 @@ from datetime import date, datetime, timedelta
 from functools import lru_cache, partial
 from itertools import groupby
 from typing import TYPE_CHECKING, Any, cast
+from uuid import UUID
 
 from sqlalchemy import (
     CTE,
@@ -1270,7 +1271,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         # Handle callback state events
         for callback_id in callback_keys_with_events:
             state, info = event_buffer.pop(callback_id)
-            callback = session.get(Callback, str(callback_id))
+            callback = session.get(Callback, UUID(str(callback_id)))
             if not callback:
                 # This should not normally happen - we just received an event 
for this callback.
                 # Only possible if callback was deleted mid-execution (e.g., 
cascade delete from DagRun deletion).
diff --git a/airflow-core/tests/unit/models/test_callback.py 
b/airflow-core/tests/unit/models/test_callback.py
index 5903f541854..c31d1c99b49 100644
--- a/airflow-core/tests/unit/models/test_callback.py
+++ b/airflow-core/tests/unit/models/test_callback.py
@@ -237,6 +237,20 @@ class TestExecutorCallback:
         callback.queue()
         assert callback.state == CallbackState.QUEUED
 
+    def test_session_get_requires_uuid_not_str(self, session):
+        """Filtering the UUID id column with a plain str breaks on SQLite, so
+        callers must wrap with ``UUID(...)`` before querying."""
+        from uuid import UUID
+
+        callback = ExecutorCallback(TEST_SYNC_CALLBACK, 
fetch_method=CallbackFetchMethod.IMPORT_PATH)
+        session.add(callback)
+        session.commit()
+        # ``id`` is filled by the ``uuid6.uuid7`` default at flush time, so it
+        # is only safe to stringify *after* the commit.
+        callback_id_str = str(callback.id)
+
+        assert session.get(Callback, UUID(callback_id_str)) is not None
+
 
 class TestDagProcessorCallback:
     def test_polymorphic_serde(self, session):

Reply via email to