This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cad63ce86af Add audit log if target dag partition mapper is wrong 
(#62509)
cad63ce86af is described below

commit cad63ce86af1b87a3d4631e03707b6999041b4cc
Author: Wei Lee <[email protected]>
AuthorDate: Thu Mar 19 10:53:42 2026 +0800

    Add audit log if target dag partition mapper is wrong (#62509)
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
---
 airflow-core/src/airflow/assets/manager.py         | 66 +++++++++++++----
 airflow-core/tests/unit/assets/test_manager.py     |  1 -
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 83 +++++++++++++++++++++-
 3 files changed, 136 insertions(+), 14 deletions(-)

diff --git a/airflow-core/src/airflow/assets/manager.py 
b/airflow-core/src/airflow/assets/manager.py
index 9c287209172..a599e79018e 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -41,6 +41,7 @@ from airflow.models.asset import (
     DagScheduleAssetUriReference,
     PartitionedAssetKeyLog,
 )
+from airflow.models.log import Log
 from airflow.utils.helpers import is_container
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.sqlalchemy import get_dialect_name, with_row_locks
@@ -201,11 +202,7 @@ class AssetManager(LoggingMixin):
             )
         )
         if not asset_model:
-            msg = f"AssetModel {asset} not found; cannot create asset event."
-            cls.logger().warning(msg)
-            # if there is a task_instance, write to task log
-            if task_instance is not None and hasattr(task_instance, "log"):
-                task_instance.log.warning(msg)
+            cls.logger().warning("AssetModel %s not found; cannot create asset 
event.", asset)
             return None
 
         if not asset_model.active:
@@ -297,6 +294,7 @@ class AssetManager(LoggingMixin):
             dags_to_queue=dags_to_queue,
             partition_key=partition_key,
             event=asset_event,
+            task_instance=task_instance,
             session=session,
         )
         return asset_event
@@ -341,10 +339,10 @@ class AssetManager(LoggingMixin):
         dags_to_queue: set[DagModel],
         partition_key: str | None,
         event: AssetEvent,
+        task_instance: TaskInstance | None,
         session: Session,
     ) -> None:
-        log.debug("dags to queue", dags_to_queue=dags_to_queue)
-
+        log.debug("Dags to queue", dags_to_queue=dags_to_queue)
         if not dags_to_queue:
             return None
 
@@ -354,6 +352,7 @@ class AssetManager(LoggingMixin):
             partition_dags=partition_dags,
             event=event,
             partition_key=partition_key,
+            task_instance=task_instance,
             session=session,
         )
 
@@ -376,22 +375,38 @@ class AssetManager(LoggingMixin):
     @classmethod
     def _queue_partitioned_dags(
         cls,
+        *,
         asset_id: int,
         partition_dags: Iterable[DagModel],
         event: AssetEvent,
         partition_key: str | None,
+        task_instance: TaskInstance | None,
         session: Session,
     ) -> None:
         if partition_dags and not partition_key:
-            # TODO: AIP-76 how to best ensure users can see this? Probably add 
Log record.
+            prefix = "Listening Dags are partition-aware but the run has no 
partition key"
             log.warning(
-                "Listening Dags are partition-aware but run has no partition 
key",
+                prefix,
                 listening_dags=[x.dag_id for x in partition_dags],
                 asset_id=asset_id,
                 run_id=event.source_run_id,
                 dag_id=event.source_dag_id,
                 task_id=event.source_task_id,
             )
+            msg = (
+                f"{prefix} (listening_dags={[x.dag_id for x in 
partition_dags]}, "
+                f"asset_id={asset_id}, "
+                f"run_id={event.source_run_id}, "
+                f"dag_id={event.source_dag_id}, "
+                f"task_id={event.source_task_id})"
+            )
+            session.add(
+                Log(
+                    event="missing partition key",
+                    extra=msg,
+                    task_instance=task_instance,
+                )
+            )
             return
 
         for target_dag in partition_dags:
@@ -409,9 +424,36 @@ class AssetManager(LoggingMixin):
             if (asset_model := 
session.scalar(select(AssetModel).where(AssetModel.id == asset_id))) is None:
                 raise RuntimeError(f"Could not find asset for 
asset_id={asset_id}")
 
-            target_key = timetable.get_partition_mapper(
-                name=asset_model.name, uri=asset_model.uri
-            ).to_downstream(partition_key)
+            try:
+                # We'll need to catch every possible exception happen when 
mapping partition_key.
+                target_key = timetable.get_partition_mapper(
+                    name=asset_model.name, uri=asset_model.uri
+                ).to_downstream(partition_key)
+            except Exception as err:
+                log.exception(
+                    "Could not map partition key for asset in target Dag. "
+                    "This likely indicates the target Dag's partition mapper "
+                    "is misconfigured, or does not support this partition 
key.",
+                    partition_key=partition_key,
+                    asset=asset_model,
+                    target_dag=target_dag,
+                )
+                log_extra = (
+                    f"Could not map partition_key '{partition_key}' for asset "
+                    f"(name='{asset_model.name}', uri='{asset_model.uri}') in 
target Dag "
+                    f"'{target_dag.dag_id}'. This likely indicates that the 
partition "
+                    f"mapper in the target Dag is misconfigured or does not 
support this "
+                    f"partition key.\n{type(err).__name__}: {err}"
+                )
+                session.add(
+                    Log(
+                        event="failed to map partition_key",
+                        extra=log_extra,
+                        task_instance=task_instance,
+                    )
+                )
+                continue
+
             if is_container(target_key):
                 # TODO (AIP-76): This never happens now. When we implement
                 # one-to-many partition key mapping, this should also add a
diff --git a/airflow-core/tests/unit/assets/test_manager.py 
b/airflow-core/tests/unit/assets/test_manager.py
index a3402e8e0ec..82477042d86 100644
--- a/airflow-core/tests/unit/assets/test_manager.py
+++ b/airflow-core/tests/unit/assets/test_manager.py
@@ -90,7 +90,6 @@ class TestAssetManager:
         # AssetDagRunQueue rows
         mock_session.add.assert_not_called()
         mock_session.merge.assert_not_called()
-        mock_task_instance.log.warning.assert_called()
 
     @pytest.mark.usefixtures("dag_maker", "testing_dag_bundle")
     def test_register_asset_change(self, session, mock_task_instance):
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 4511c333689..23fbecbf73a 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -85,7 +85,16 @@ from airflow.partition_mappers.base import PartitionMapper 
as CorePartitionMappe
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.providers.standard.triggers.file import FileDeleteTrigger
-from airflow.sdk import DAG, Asset, AssetAlias, AssetWatcher, IdentityMapper, 
task
+from airflow.sdk import (
+    DAG,
+    Asset,
+    AssetAlias,
+    AssetWatcher,
+    CronPartitionTimetable,
+    HourlyMapper,
+    IdentityMapper,
+    task,
+)
 from airflow.sdk.definitions.callback import AsyncCallback, SyncCallback
 from airflow.sdk.definitions.timetables.assets import PartitionedAssetTimetable
 from airflow.serialization.definitions.dag import SerializedDAG
@@ -8873,6 +8882,78 @@ def _produce_and_register_asset_event(
     return apdr
 
 
[email protected]_serialized_dag
[email protected]("clear_asset_partition_rows")
+def test_partitioned_dag_run_with_invalid_mapping(dag_maker: DagMaker, 
session: Session):
+    session.execute(delete(Log))
+    asset_1 = Asset(name="asset-1")
+    with dag_maker(
+        dag_id="asset-event-consumer",
+        schedule=PartitionedAssetTimetable(
+            assets=asset_1,
+            default_partition_mapper=HourlyMapper(),
+        ),
+        session=session,
+    ):
+        EmptyOperator(task_id="hi")
+    session.commit()
+
+    runner = SchedulerJobRunner(
+        job=Job(job_type=SchedulerJobRunner.job_type), 
executors=[MockExecutor(do_update=False)]
+    )
+    with dag_maker(
+        dag_id="asset-event-producer",
+        schedule=CronPartitionTimetable("* * * * *", timezone="UTC"),
+        session=session,
+    ) as dag:
+        EmptyOperator(task_id="hi", outlets=[asset_1])
+
+    partition_key = "an invalid key for HourlyMapper"
+    dr = dag_maker.create_dagrun(partition_key=partition_key, session=session)
+    [ti] = dr.get_task_instances(session=session)
+    session.commit()
+
+    serialized_outlets = dag.get_task("hi").outlets
+    TaskInstance.register_asset_changes_in_db(
+        ti=ti,
+        task_outlets=[o.asprofile() for o in serialized_outlets],
+        outlet_events=[],
+        session=session,
+    )
+    session.commit()
+    event = session.scalar(
+        select(AssetEvent).where(
+            AssetEvent.source_dag_id == dag.dag_id,
+            AssetEvent.source_run_id == dr.run_id,
+        )
+    )
+    assert event is not None
+    assert event.partition_key == partition_key
+    apdr = session.scalar(
+        select(AssetPartitionDagRun)
+        .join(
+            PartitionedAssetKeyLog,
+            PartitionedAssetKeyLog.asset_partition_dag_run_id == 
AssetPartitionDagRun.id,
+        )
+        .where(PartitionedAssetKeyLog.asset_event_id == event.id)
+    )
+    assert apdr is None
+
+    partition_dags = 
runner._create_dagruns_for_partitioned_asset_dags(session=session)
+    assert len(partition_dags) == 0
+    assert partition_dags == set()
+
+    audit_log = session.scalar(select(Log))
+    assert audit_log is not None
+    assert audit_log.extra == (
+        "Could not map partition_key 'an invalid key for HourlyMapper' "
+        "for asset (name='asset-1', uri='asset-1') in target Dag 
'asset-event-consumer'. "
+        "This likely indicates that the partition mapper in the target Dag is 
misconfigured or "
+        "does not support this partition key.\n"
+        "ValueError: time data 'an invalid key for HourlyMapper' does not 
match format '%Y-%m-%dT%H:%M:%S'"
+    )
+
+
 @pytest.mark.need_serialized_dag
 @pytest.mark.usefixtures("clear_asset_partition_rows")
 def test_partitioned_dag_run_with_customized_mapper(

Reply via email to