This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 19501149482 feat: Add JobDependenciesRunFacet to asset-triggered OL 
DAG events (#59521)
19501149482 is described below

commit 19501149482fcd21602d631ecc1ee3f2a408b0c7
Author: Kacper Muda <[email protected]>
AuthorDate: Fri Jan 9 16:15:59 2026 +0100

    feat: Add JobDependenciesRunFacet to asset-triggered OL DAG events (#59521)
---
 .../unit/google/cloud/operators/test_dataproc.py   |  92 ++-
 .../providers/openlineage/plugins/adapter.py       |  39 +-
 .../providers/openlineage/plugins/listener.py      |   5 +
 .../airflow/providers/openlineage/utils/utils.py   | 367 ++++++++++-
 .../tests/unit/openlineage/plugins/test_adapter.py |  22 +-
 .../tests/unit/openlineage/utils/test_utils.py     | 724 ++++++++++++++++++++-
 6 files changed, 1187 insertions(+), 62 deletions(-)

diff --git 
a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py 
b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
index e2db6935fc2..17b957aaca3 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py
@@ -440,10 +440,10 @@ OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_SPARK_PROPERTIES = {
 OPENLINEAGE_PARENT_JOB_EXAMPLE_SPARK_PROPERTIES = {
     "spark.openlineage.parentJobName": "dag_id.task_id",
     "spark.openlineage.parentJobNamespace": "default",
-    "spark.openlineage.parentRunId": "01931885-2800-7be7-aa8d-aaa15c337267",
+    "spark.openlineage.parentRunId": "11111111-1111-1111-1111-111111111111",
     "spark.openlineage.rootParentJobName": "dag_id",
     "spark.openlineage.rootParentJobNamespace": "default",
-    "spark.openlineage.rootParentRunId": 
"01931885-2800-7be7-aa8d-aaa15c337267",
+    "spark.openlineage.rootParentRunId": 
"22222222-2222-2222-2222-222222222222",
 }
 
 
@@ -1430,13 +1430,15 @@ class 
TestDataprocSubmitJobOperator(DataprocJobTestBase):
         op.execute(context=self.mock_context)
         assert not mock_defer.called
 
-    
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
+    
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_dag_run_ol_run_id")
+    
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_task_instance_ol_run_id")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     def test_execute_openlineage_parent_job_info_injection(
-        self, mock_hook, mock_ol_accessible, mock_static_uuid
+        self, mock_hook, mock_ol_accessible, task_ol_run_id, dag_ol_run_id
     ):
-        mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
+        task_ol_run_id.return_value = "11111111-1111-1111-1111-111111111111"
+        dag_ol_run_id.return_value = "22222222-2222-2222-2222-222222222222"
         job_config = {
             "placement": {"cluster_name": CLUSTER_NAME},
             "pyspark_job": {
@@ -1456,10 +1458,10 @@ class 
TestDataprocSubmitJobOperator(DataprocJobTestBase):
                     "spark.openlineage.transport.type": "console",
                     "spark.openlineage.parentJobName": "dag_id.task_id",
                     "spark.openlineage.parentJobNamespace": "default",
-                    "spark.openlineage.parentRunId": 
"01931885-2800-7be7-aa8d-aaa15c337267",
+                    "spark.openlineage.parentRunId": 
"11111111-1111-1111-1111-111111111111",
                     "spark.openlineage.rootParentJobName": "dag_id",
                     "spark.openlineage.rootParentJobNamespace": "default",
-                    "spark.openlineage.rootParentRunId": 
"01931885-2800-7be7-aa8d-aaa15c337267",
+                    "spark.openlineage.rootParentRunId": 
"22222222-2222-2222-2222-222222222222",
                 },
             },
         }
@@ -1499,15 +1501,17 @@ class 
TestDataprocSubmitJobOperator(DataprocJobTestBase):
             metadata=METADATA,
         )
 
-    
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
+    
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_dag_run_ol_run_id")
+    
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_task_instance_ol_run_id")
     
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     def test_execute_openlineage_http_transport_info_injection(
-        self, mock_hook, mock_ol_accessible, mock_ol_listener, mock_static_uuid
+        self, mock_hook, mock_ol_accessible, mock_ol_listener, task_ol_run_id, 
dag_ol_run_id
     ):
         mock_ol_accessible.return_value = True
-        mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
+        task_ol_run_id.return_value = "11111111-1111-1111-1111-111111111111"
+        dag_ol_run_id.return_value = "22222222-2222-2222-2222-222222222222"
         fake_listener = mock.MagicMock()
         mock_ol_listener.return_value = fake_listener
         
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
@@ -1556,15 +1560,17 @@ class 
TestDataprocSubmitJobOperator(DataprocJobTestBase):
             metadata=METADATA,
         )
 
-    
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
+    
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_dag_run_ol_run_id")
+    
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_task_instance_ol_run_id")
     
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     def test_execute_openlineage_all_info_injection(
-        self, mock_hook, mock_ol_accessible, mock_ol_listener, mock_static_uuid
+        self, mock_hook, mock_ol_accessible, mock_ol_listener, task_ol_run_id, 
dag_ol_run_id
     ):
         mock_ol_accessible.return_value = True
-        mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
+        task_ol_run_id.return_value = "11111111-1111-1111-1111-111111111111"
+        dag_ol_run_id.return_value = "22222222-2222-2222-2222-222222222222"
         fake_listener = mock.MagicMock()
         mock_ol_listener.return_value = fake_listener
         
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
@@ -2591,14 +2597,16 @@ class 
TestDataprocWorkflowTemplateInstantiateInlineOperator:
         )
         mock_op.return_value.result.assert_not_called()
 
-    
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
+    
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_dag_run_ol_run_id")
+    
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_task_instance_ol_run_id")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     def test_execute_openlineage_parent_job_info_injection(
-        self, mock_hook, mock_ol_accessible, mock_static_uuid
+        self, mock_hook, mock_ol_accessible, task_ol_run_id, dag_ol_run_id
     ):
         mock_ol_accessible.return_value = True
-        mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
+        task_ol_run_id.return_value = "11111111-1111-1111-1111-111111111111"
+        dag_ol_run_id.return_value = "22222222-2222-2222-2222-222222222222"
         template = {
             **WORKFLOW_TEMPLATE,
             "jobs": [
@@ -2643,10 +2651,10 @@ class 
TestDataprocWorkflowTemplateInstantiateInlineOperator:
                             "spark.sql.shuffle.partitions": "1",
                             "spark.openlineage.parentJobName": 
"dag_id.task_id",
                             "spark.openlineage.parentJobNamespace": "default",
-                            "spark.openlineage.parentRunId": 
"01931885-2800-7be7-aa8d-aaa15c337267",
+                            "spark.openlineage.parentRunId": 
"11111111-1111-1111-1111-111111111111",
                             "spark.openlineage.rootParentJobName": "dag_id",
                             "spark.openlineage.rootParentJobNamespace": 
"default",
-                            "spark.openlineage.rootParentRunId": 
"01931885-2800-7be7-aa8d-aaa15c337267",
+                            "spark.openlineage.rootParentRunId": 
"22222222-2222-2222-2222-222222222222",
                         },
                     },
                 },
@@ -2784,15 +2792,17 @@ class 
TestDataprocWorkflowTemplateInstantiateInlineOperator:
             metadata=METADATA,
         )
 
-    
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
+    
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_dag_run_ol_run_id")
+    
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_task_instance_ol_run_id")
     
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     def test_execute_openlineage_transport_info_injection(
-        self, mock_hook, mock_ol_accessible, mock_ol_listener, mock_static_uuid
+        self, mock_hook, mock_ol_accessible, mock_ol_listener, task_ol_run_id, 
dag_ol_run_id
     ):
         mock_ol_accessible.return_value = True
-        mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
+        task_ol_run_id.return_value = "11111111-1111-1111-1111-111111111111"
+        dag_ol_run_id.return_value = "22222222-2222-2222-2222-222222222222"
         fake_listener = mock.MagicMock()
         mock_ol_listener.return_value = fake_listener
         
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
@@ -2892,15 +2902,17 @@ class 
TestDataprocWorkflowTemplateInstantiateInlineOperator:
             metadata=METADATA,
         )
 
-    
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
+    
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_dag_run_ol_run_id")
+    
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_task_instance_ol_run_id")
     
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     def test_execute_openlineage_all_info_injection(
-        self, mock_hook, mock_ol_accessible, mock_ol_listener, mock_static_uuid
+        self, mock_hook, mock_ol_accessible, mock_ol_listener, task_ol_run_id, 
dag_ol_run_id
     ):
         mock_ol_accessible.return_value = True
-        mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
+        task_ol_run_id.return_value = "11111111-1111-1111-1111-111111111111"
+        dag_ol_run_id.return_value = "22222222-2222-2222-2222-222222222222"
         fake_listener = mock.MagicMock()
         mock_ol_listener.return_value = fake_listener
         
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
@@ -3419,7 +3431,8 @@ class TestDataprocCreateBatchOperator:
         mock_log.info.assert_any_call("Batch with given id already exists.")
 
     @mock.patch.object(DataprocCreateBatchOperator, "log", 
new_callable=mock.MagicMock)
-    
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
+    
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_dag_run_ol_run_id")
+    
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_task_instance_ol_run_id")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("Batch.to_dict"))
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
@@ -3428,11 +3441,13 @@ class TestDataprocCreateBatchOperator:
         mock_hook,
         to_dict_mock,
         mock_ol_accessible,
-        mock_static_uuid,
+        task_ol_run_id,
+        dag_ol_run_id,
         mock_log,
     ):
         mock_ol_accessible.return_value = True
-        mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
+        task_ol_run_id.return_value = "11111111-1111-1111-1111-111111111111"
+        dag_ol_run_id.return_value = "22222222-2222-2222-2222-222222222222"
         expected_batch = {
             **BATCH,
             "labels": EXPECTED_LABELS,
@@ -3474,16 +3489,25 @@ class TestDataprocCreateBatchOperator:
         mock_log.info.assert_any_call("Batch job %s completed.\nDriver logs: 
%s", BATCH_ID, logs_link)
 
     @mock.patch.object(DataprocCreateBatchOperator, "log", 
new_callable=mock.MagicMock)
-    
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
+    
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_dag_run_ol_run_id")
+    
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_task_instance_ol_run_id")
     
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("Batch.to_dict"))
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     def test_execute_openlineage_transport_info_injection(
-        self, mock_hook, to_dict_mock, mock_ol_accessible, mock_ol_listener, 
mock_static_uuid, mock_log
+        self,
+        mock_hook,
+        to_dict_mock,
+        mock_ol_accessible,
+        mock_ol_listener,
+        task_ol_run_id,
+        dag_ol_run_id,
+        mock_log,
     ):
         mock_ol_accessible.return_value = True
-        mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
+        task_ol_run_id.return_value = "11111111-1111-1111-1111-111111111111"
+        dag_ol_run_id.return_value = "22222222-2222-2222-2222-222222222222"
         fake_listener = mock.MagicMock()
         mock_ol_listener.return_value = fake_listener
         
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
@@ -3533,16 +3557,18 @@ class TestDataprocCreateBatchOperator:
             logs_link,
         )
 
-    
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
+    
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_dag_run_ol_run_id")
+    
@mock.patch("airflow.providers.openlineage.plugins.adapter.build_task_instance_ol_run_id")
     
@mock.patch("airflow.providers.openlineage.utils.spark.get_openlineage_listener")
     
@mock.patch("airflow.providers.google.cloud.openlineage.utils._is_openlineage_provider_accessible")
     @mock.patch(DATAPROC_PATH.format("Batch.to_dict"))
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     def test_execute_openlineage_all_info_injection(
-        self, mock_hook, to_dict_mock, mock_ol_accessible, mock_ol_listener, 
mock_static_uuid
+        self, mock_hook, to_dict_mock, mock_ol_accessible, mock_ol_listener, 
task_ol_run_id, dag_ol_run_id
     ):
         mock_ol_accessible.return_value = True
-        mock_static_uuid.return_value = "01931885-2800-7be7-aa8d-aaa15c337267"
+        task_ol_run_id.return_value = "11111111-1111-1111-1111-111111111111"
+        dag_ol_run_id.return_value = "22222222-2222-2222-2222-222222222222"
         fake_listener = mock.MagicMock()
         mock_ol_listener.return_value = fake_listener
         
fake_listener.adapter.get_or_create_openlineage_client.return_value.transport = 
HttpTransport(
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
index 1ed20b70cc2..eaca60bd804 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
@@ -34,14 +34,16 @@ from openlineage.client.facet_v2 import (
     ownership_job,
     tags_job,
 )
-from openlineage.client.uuid import generate_static_uuid
 
 from airflow.providers.common.compat.sdk import Stats, conf as airflow_conf
 from airflow.providers.openlineage import __version__ as 
OPENLINEAGE_PROVIDER_VERSION, conf
 from airflow.providers.openlineage.utils.utils import (
     OpenLineageRedactor,
+    build_dag_run_ol_run_id,
+    build_task_instance_ol_run_id,
     get_airflow_debug_facet,
     get_airflow_state_run_facet,
+    get_dag_job_dependency_facet,
     get_processing_engine_facet,
 )
 from airflow.utils.log.logging_mixin import LoggingMixin
@@ -123,12 +125,7 @@ class OpenLineageAdapter(LoggingMixin):
 
     @staticmethod
     def build_dag_run_id(dag_id: str, logical_date: datetime, clear_number: 
int) -> str:
-        return str(
-            generate_static_uuid(
-                instant=logical_date,
-                data=f"{conf.namespace()}.{dag_id}.{clear_number}".encode(),
-            )
-        )
+        return build_dag_run_ol_run_id(dag_id=dag_id, 
logical_date=logical_date, clear_number=clear_number)
 
     @staticmethod
     def build_task_instance_run_id(
@@ -138,11 +135,12 @@ class OpenLineageAdapter(LoggingMixin):
         logical_date: datetime,
         map_index: int,
     ):
-        return str(
-            generate_static_uuid(
-                instant=logical_date,
-                
data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}.{map_index}".encode(),
-            )
+        return build_task_instance_ol_run_id(
+            dag_id=dag_id,
+            task_id=task_id,
+            try_number=try_number,
+            logical_date=logical_date,
+            map_index=map_index,
         )
 
     def emit(self, event: RunEvent):
@@ -365,6 +363,7 @@ class OpenLineageAdapter(LoggingMixin):
     def dag_started(
         self,
         dag_id: str,
+        run_id: str,
         logical_date: datetime,
         start_date: datetime,
         nominal_start_time: str | None,
@@ -374,10 +373,14 @@ class OpenLineageAdapter(LoggingMixin):
         run_facets: dict[str, RunFacet],
         clear_number: int,
         job_description: str | None,
+        is_asset_triggered: bool,
         job_description_type: str | None = None,
         job_facets: dict[str, JobFacet] | None = None,  # Custom job facets
     ):
         try:
+            job_dependency_facet = {}
+            if is_asset_triggered:
+                job_dependency_facet = 
get_dag_job_dependency_facet(dag_id=dag_id, dag_run_id=run_id)
             event = RunEvent(
                 eventType=RunState.START,
                 eventTime=start_date.isoformat(),
@@ -396,7 +399,7 @@ class OpenLineageAdapter(LoggingMixin):
                     ),
                     nominal_start_time=nominal_start_time,
                     nominal_end_time=nominal_end_time,
-                    run_facets={**run_facets, **get_airflow_debug_facet()},
+                    run_facets={**run_facets, **get_airflow_debug_facet(), 
**job_dependency_facet},
                 ),
                 inputs=[],
                 outputs=[],
@@ -424,9 +427,13 @@ class OpenLineageAdapter(LoggingMixin):
         owners: list[str] | None,
         run_facets: dict[str, RunFacet],
         job_description: str | None,
+        is_asset_triggered: bool,
         job_description_type: str | None = None,
     ):
         try:
+            job_dependency_facet = {}
+            if is_asset_triggered:
+                job_dependency_facet = 
get_dag_job_dependency_facet(dag_id=dag_id, dag_run_id=run_id)
             event = RunEvent(
                 eventType=RunState.COMPLETE,
                 eventTime=end_date.isoformat(),
@@ -446,6 +453,7 @@ class OpenLineageAdapter(LoggingMixin):
                     nominal_end_time=nominal_end_time,
                     run_facets={
                         **get_airflow_state_run_facet(dag_id, run_id, 
task_ids, dag_run_state),
+                        **job_dependency_facet,
                         **get_airflow_debug_facet(),
                         **run_facets,
                     },
@@ -477,9 +485,13 @@ class OpenLineageAdapter(LoggingMixin):
         msg: str,
         run_facets: dict[str, RunFacet],
         job_description: str | None,
+        is_asset_triggered: bool,
         job_description_type: str | None = None,
     ):
         try:
+            job_dependency_facet = {}
+            if is_asset_triggered:
+                job_dependency_facet = 
get_dag_job_dependency_facet(dag_id=dag_id, dag_run_id=run_id)
             event = RunEvent(
                 eventType=RunState.FAIL,
                 eventTime=end_date.isoformat(),
@@ -502,6 +514,7 @@ class OpenLineageAdapter(LoggingMixin):
                             message=msg, programmingLanguage="python"
                         ),
                         **get_airflow_state_run_facet(dag_id, run_id, 
task_ids, dag_run_state),
+                        **job_dependency_facet,
                         **get_airflow_debug_facet(),
                         **run_facets,
                     },
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
index 6a530c2f93a..bc94294d770 100644
--- 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
+++ 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
@@ -46,6 +46,7 @@ from airflow.providers.openlineage.utils.utils import (
     get_task_documentation,
     get_task_parent_run_facet,
     get_user_provided_run_facets,
+    is_dag_run_asset_triggered,
     is_operator_disabled,
     is_selective_lineage_enabled,
     print_warning,
@@ -668,6 +669,7 @@ class OpenLineageListener:
             self.submit_callable(
                 self.adapter.dag_started,
                 dag_id=dag_run.dag_id,
+                run_id=dag_run.run_id,
                 logical_date=date,
                 start_date=dag_run.start_date,
                 nominal_start_time=data_interval_start,
@@ -684,6 +686,7 @@ class OpenLineageListener:
                     **get_airflow_dag_run_facet(dag_run),
                     **get_dag_parent_run_facet(getattr(dag_run, "conf", {})),
                 },
+                is_asset_triggered=is_dag_run_asset_triggered(dag_run),
             )
         except BaseException as e:
             self.log.warning("OpenLineage received exception in method 
on_dag_run_running", exc_info=e)
@@ -735,6 +738,7 @@ class OpenLineageListener:
                     **get_airflow_dag_run_facet(dag_run),
                     **get_dag_parent_run_facet(getattr(dag_run, "conf", {})),
                 },
+                is_asset_triggered=is_dag_run_asset_triggered(dag_run),
             )
         except BaseException as e:
             self.log.warning("OpenLineage received exception in method 
on_dag_run_success", exc_info=e)
@@ -787,6 +791,7 @@ class OpenLineageListener:
                     **get_airflow_dag_run_facet(dag_run),
                     **get_dag_parent_run_facet(getattr(dag_run, "conf", {})),
                 },
+                is_asset_triggered=is_dag_run_asset_triggered(dag_run),
             )
         except BaseException as e:
             self.log.warning("OpenLineage received exception in method 
on_dag_run_failed", exc_info=e)
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py 
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index 91ccfcd13b1..ba45eac13de 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -27,13 +27,12 @@ from importlib import metadata
 from typing import TYPE_CHECKING, Any
 
 import attrs
-from openlineage.client.facet_v2 import parent_run
+from openlineage.client.facet_v2 import job_dependencies_run, parent_run
 from openlineage.client.utils import RedactMixin
+from openlineage.client.uuid import generate_static_uuid
 
 from airflow import __version__ as AIRFLOW_VERSION
 from airflow.exceptions import AirflowOptionalProviderFeatureException
-
-# TODO: move this maybe to Airflow's logic?
 from airflow.models import DagRun, TaskInstance, TaskReschedule
 from airflow.providers.common.compat.assets import Asset
 from airflow.providers.common.compat.module_loading import import_string
@@ -75,6 +74,7 @@ if TYPE_CHECKING:
     from openlineage.client.event_v2 import Dataset as OpenLineageDataset
     from openlineage.client.facet_v2 import RunFacet, processing_engine_run
 
+    from airflow.models.asset import AssetEvent
     from airflow.sdk.execution_time.secrets_masker import (
         Redactable,
         Redacted,
@@ -726,18 +726,21 @@ class DagRunInfo(InfoJsonEncodable):
     """Defines encoding DagRun object to JSON."""
 
     includes = [
+        "clear_number",
         "conf",
         "dag_id",
-        "data_interval_start",
         "data_interval_end",
-        "external_trigger",  # Removed in Airflow 3, use run_type instead
+        "data_interval_start",
+        "end_date",
         "execution_date",  # Airflow 2
+        "external_trigger",  # Removed in Airflow 3, use run_type instead
         "logical_date",  # Airflow 3
         "run_after",  # Airflow 3
         "run_id",
         "run_type",
         "start_date",
-        "end_date",
+        "triggered_by",
+        "triggering_user_name",  # Airflow 3
     ]
 
     casts = {
@@ -1000,6 +1003,358 @@ def get_airflow_state_run_facet(
     }
 
 
+def is_dag_run_asset_triggered(
+    dag_run: DagRun,
+):
+    """Return whether the given DAG run was triggered by an asset."""
+    if AIRFLOW_V_3_0_PLUS:
+        from airflow.utils.types import DagRunTriggeredByType
+
+        return dag_run.triggered_by == DagRunTriggeredByType.ASSET
+
+    # AF 2 Path
+    from airflow.models.dagrun import DagRunType
+
+    return dag_run.run_type == DagRunType.DATASET_TRIGGERED  # type: 
ignore[attr-defined]  # This attr is available on AF2, but mypy can't see it
+
+
+def build_task_instance_ol_run_id(
+    dag_id: str,
+    task_id: str,
+    try_number: int,
+    logical_date: datetime.datetime,
+    map_index: int,
+):
+    """
+    Generate a deterministic OpenLineage run ID for a task instance.
+
+    Args:
+        dag_id: The DAG identifier.
+        task_id: The task identifier.
+        try_number: The task try number.
+        logical_date: The logical execution date from dagrun.
+        map_index: The task map index.
+
+    Returns:
+        A deterministic OpenLineage run ID for the task instance.
+    """
+    return str(
+        generate_static_uuid(
+            instant=logical_date,
+            
data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}.{map_index}".encode(),
+        )
+    )
+
+
+def is_valid_uuid(uuid_string: str | None) -> bool:
+    """Validate that a string is a valid UUID format."""
+    if uuid_string is None:
+        return False
+    try:
+        from uuid import UUID
+
+        UUID(uuid_string)
+        return True
+    except (ValueError, TypeError):
+        return False
+
+
+def build_dag_run_ol_run_id(dag_id: str, logical_date: datetime.datetime, 
clear_number: int) -> str:
+    """
+    Generate a deterministic OpenLineage run ID for a DAG run.
+
+    Args:
+        dag_id: The DAG identifier.
+        logical_date: The logical execution date.
+        clear_number: The DAG run clear number.
+
+    Returns:
+        A deterministic OpenLineage run ID for the DAG run.
+    """
+    return str(
+        generate_static_uuid(
+            instant=logical_date,
+            data=f"{conf.namespace()}.{dag_id}.{clear_number}".encode(),
+        )
+    )
+
+
+def _get_eagerly_loaded_dagrun_consumed_asset_events(dag_id: str, dag_run_id: 
str) -> list[AssetEvent]:
+    """
+    Retrieve consumed asset events for a DagRun with relationships eagerly 
loaded.
+
+    Downstream code accesses source_task_instance, source_dag_run, and asset 
on each AssetEvent.
+    These relationships are lazy-loaded by default, which could cause N+1 
query problem
+    (2 + 3*N queries for N events). Using `joinedload` fetches everything in a 
single query.
+    The returned AssetEvent objects have all needed relationships 
pre-populated in memory,
+    so they can be safely used after the session is closed.
+
+    Returns:
+        AssetEvent objects with populated relationships, or empty list if 
DagRun not found.
+    """
+    # This should only be used on scheduler, so DB access is allowed
+    from sqlalchemy import select
+    from sqlalchemy.orm import joinedload
+
+    from airflow.utils.session import create_session
+
+    if AIRFLOW_V_3_0_PLUS:
+        from airflow.models.asset import AssetEvent
+
+        options = (
+            
joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.source_dag_run),
+            
joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.source_task_instance),
+            
joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.asset),
+        )
+
+    else:  # AF2 path
+        from airflow.models.dataset import DatasetEvent
+
+        options = (
+            
joinedload(DagRun.consumed_dataset_events).joinedload(DatasetEvent.source_dag_run),
+            
joinedload(DagRun.consumed_dataset_events).joinedload(DatasetEvent.source_task_instance),
+            
joinedload(DagRun.consumed_dataset_events).joinedload(DatasetEvent.dataset),
+        )
+
+    with create_session() as session:
+        dag_run_with_events = session.scalar(
+            select(DagRun).where(DagRun.dag_id == dag_id).where(DagRun.run_id 
== dag_run_id).options(*options)
+        )
+
+    if not dag_run_with_events:
+        return []
+
+    if AIRFLOW_V_3_0_PLUS:
+        events = dag_run_with_events.consumed_asset_events
+    else:  # AF2 path
+        events = dag_run_with_events.consumed_dataset_events
+
+    return events
+
+
+def _extract_ol_info_from_asset_event(asset_event: AssetEvent) -> dict[str, 
str] | None:
+    """
+    Extract OpenLineage job information from an AssetEvent.
+
+    Information is gathered from multiple potential sources, checked in 
priority
+    order:
+    1. TaskInstance (primary): Provides the most complete and reliable context.
+    2. AssetEvent source fields (fallback): Offers basic `dag_id.task_id` 
metadata.
+    3. `asset_event.extra["openlineage"]` (last resort): May include user 
provided OpenLineage details.
+
+    Args:
+        asset_event: The AssetEvent from which to extract job information.
+
+    Returns:
+        A dictionary containing `job_name`, `job_namespace`, and optionally
+        `run_id`, or `None` if insufficient information is available.
+    """
+    # First check for TaskInstance
+    if ti := asset_event.source_task_instance:
+        result = {
+            "job_name": get_job_name(ti),
+            "job_namespace": conf.namespace(),
+        }
+        source_dr = asset_event.source_dag_run
+        if source_dr:
+            logical_date = source_dr.logical_date  # Get logical date from 
DagRun for OL run_id generation
+            if AIRFLOW_V_3_0_PLUS and logical_date is None:
+                logical_date = source_dr.run_after
+            if logical_date is not None:
+                result["run_id"] = build_task_instance_ol_run_id(
+                    dag_id=ti.dag_id,
+                    task_id=ti.task_id,
+                    try_number=ti.try_number,
+                    logical_date=logical_date,
+                    map_index=ti.map_index,
+                )
+        return result
+
+    # Then, check AssetEvent source_* fields
+    if asset_event.source_dag_id and asset_event.source_task_id:
+        return {
+            "job_name": 
f"{asset_event.source_dag_id}.{asset_event.source_task_id}",
+            "job_namespace": conf.namespace(),
+            # run_id cannot be constructed from these fields alone
+        }
+
+    # Lastly, check asset_event.extra["openlineage"]
+    if asset_event.extra:
+        ol_info_from_extra = asset_event.extra.get("openlineage")
+        if isinstance(ol_info_from_extra, dict):
+            job_name = ol_info_from_extra.get("parentJobName")
+            job_namespace = ol_info_from_extra.get("parentJobNamespace")
+            run_id = ol_info_from_extra.get("parentRunId")
+
+            if job_name and job_namespace:
+                result = {
+                    "job_name": str(job_name),
+                    "job_namespace": str(job_namespace),
+                }
+                if run_id:
+                    if not is_valid_uuid(str(run_id)):
+                        log.warning(
+                            "Invalid runId in AssetEvent.extra; ignoring 
value. event_id=%s, run_id=%s",
+                            asset_event.id,
+                            run_id,
+                        )
+                    else:
+                        result["run_id"] = str(run_id)
+                return result
+    return None
+
+
+def _get_ol_job_dependencies_from_asset_events(events: list[AssetEvent]) -> 
list[dict[str, Any]]:
+    """
+    Extract and deduplicate OpenLineage job dependencies from asset events.
+
+    This function processes a list of asset events, extracts OpenLineage 
dependency information
+    from all relevant sources, and deduplicates the results based on the tuple 
(job_namespace, job_name, run_id)
+    to prevent emitting duplicate dependencies. Multiple asset events from the 
same job but different
+    source runs/assets are aggregated into a single dependency entry with all 
source information preserved.
+
+    Args:
+        events: List of AssetEvent objects to process.
+
+    Returns:
+        A list of deduplicated dictionaries containing OpenLineage job 
dependency information.
+        Each dictionary includes job_name, job_namespace, optional run_id, and 
an asset_events
+        list containing source information from all aggregated events.
+    """
+    # Use a dictionary keyed by (namespace, job_name, run_id) to deduplicate
+    # Multiple asset events from the same task instance should only create one 
dependency
+    deduplicated_jobs: dict[tuple[str, str, str | None], dict[str, Any]] = {}
+
+    for asset_event in events:
+        # Extract OpenLineage information
+        ol_info = _extract_ol_info_from_asset_event(asset_event)
+
+        # Skip if we don't have minimum required info (job_name and namespace)
+        if not ol_info:
+            log.debug(
+                "Insufficient OpenLineage information, skipping asset event: 
%s",
+                str(asset_event),
+            )
+            continue
+
+        # Create deduplication key: (namespace, job_name, run_id)
+        # We deduplicate on job identity (namespace + name + run_id), not on 
source dag_run_id
+        # Multiple asset events from the same job but different source 
runs/assets are aggregated
+        dedup_key = (
+            ol_info["job_namespace"],
+            ol_info["job_name"],
+            ol_info.get("run_id"),
+        )
+
+        # Collect source information for this asset event
+        source_info = {
+            "dag_run_id": asset_event.source_run_id,
+            "asset_event_id": asset_event.id,
+            "asset_event_extra": asset_event.extra or None,
+            "asset_id": asset_event.asset_id if AIRFLOW_V_3_0_PLUS else 
asset_event.dataset_id,
+            "asset_uri": asset_event.uri,
+            "partition_key": getattr(asset_event, "partition_key", None),
+        }
+
+        if dedup_key not in deduplicated_jobs:
+            # First occurrence: create the job entry with initial source info
+            deduplicated_jobs[dedup_key] = {**ol_info, "asset_events": 
[source_info]}
+        else:
+            # Already seen: append source info to existing entry
+            deduplicated_jobs[dedup_key]["asset_events"].append(source_info)
+
+    result = list(deduplicated_jobs.values())
+    return result
+
+
+def _build_job_dependency_facet(
+    dag_id: str, dag_run_id: str
+) -> dict[str, job_dependencies_run.JobDependenciesRunFacet]:
+    """
+    Build the JobDependenciesRunFacet for a DagRun.
+
+    Args:
+        dag_id: The DAG identifier.
+        dag_run_id: The DagRun identifier.
+
+    Returns:
+        A dictionary containing the JobDependenciesRunFacet, or an empty 
dictionary.
+    """
+    log.info(
+        "Building OpenLineage JobDependenciesRunFacet for DagRun(dag_id=%s, 
run_id=%s).",
+        dag_id,
+        dag_run_id,
+    )
+    events = _get_eagerly_loaded_dagrun_consumed_asset_events(dag_id, 
dag_run_id)
+
+    if not events:
+        log.info("DagRun %s/%s has no consumed asset events", dag_id, 
dag_run_id)
+        return {}
+
+    ol_dependencies = _get_ol_job_dependencies_from_asset_events(events=events)
+
+    if not ol_dependencies:
+        log.info(
+            "No OpenLineage job dependencies generated from asset events 
consumed by DagRun %s/%s.",
+            dag_id,
+            dag_run_id,
+        )
+        return {}
+
+    upstream_dependencies = []
+    for job in ol_dependencies:
+        job_identifier = job_dependencies_run.JobIdentifier(
+            namespace=job["job_namespace"],
+            name=job["job_name"],
+        )
+
+        run_identifier = None
+        if job.get("run_id"):
+            run_identifier = 
job_dependencies_run.RunIdentifier(runId=job["run_id"])
+
+        job_dependency = job_dependencies_run.JobDependency(
+            job=job_identifier,
+            run=run_identifier,
+            dependency_type="IMPLICIT_ASSET_DEPENDENCY",
+        ).with_additional_properties(airflow={"asset_events": 
job.get("asset_events")})  # type: ignore[arg-type]  # Fixed in OL client 1.42, 
waiting for release
+
+        upstream_dependencies.append(job_dependency)
+
+    return {
+        "jobDependencies": job_dependencies_run.JobDependenciesRunFacet(
+            upstream=upstream_dependencies,
+        )
+    }
+
+
+def get_dag_job_dependency_facet(
+    dag_id: str, dag_run_id: str
+) -> dict[str, job_dependencies_run.JobDependenciesRunFacet]:
+    """
+    Safely retrieve the asset-triggered job dependency facet for a DagRun.
+
+    This function collects information about the asset events that triggered 
the specified DagRun,
+    including details about the originating DAG runs and task instances. If 
the DagRun was not triggered
+    by assets, or if any error occurs during lookup or processing, the 
function logs the error and returns
+    an empty dictionary. This guarantees that facet generation never raises 
exceptions and does not
+    interfere with event emission processes.
+
+    Args:
+        dag_id: The DAG identifier.
+        dag_run_id: The DagRun identifier.
+
+    Returns:
+        A dictionary with JobDependenciesRunFacet, or an empty dictionary
+        if the DagRun was not asset-triggered or if an error occurs.
+    """
+    try:
+        return _build_job_dependency_facet(dag_id=dag_id, 
dag_run_id=dag_run_id)
+    except Exception as e:
+        log.warning("Failed to build JobDependenciesRunFacet for DagRun %s/%s: 
%s.", dag_id, dag_run_id, e)
+        log.debug("Exception details:", exc_info=True)
+        return {}
+
+
 def _get_tasks_details(dag: DAG | SerializedDAG) -> dict:
     tasks = {
         single_task.task_id: {
diff --git 
a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py 
b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py
index ee22424bc78..22bfb73c20c 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py
@@ -662,10 +662,10 @@ def 
test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta
 
 
 @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True)
[email protected]("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
[email protected]("airflow.providers.openlineage.plugins.adapter.build_dag_run_ol_run_id")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
-def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, 
generate_static_uuid, mock_debug_mode):
+def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, 
build_ol_id, mock_debug_mode):
     random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
     client = MagicMock()
     adapter = OpenLineageAdapter(client)
@@ -703,7 +703,7 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
             data_interval=(event_time, event_time),
         )
     dag_run.dag = dag
-    generate_static_uuid.return_value = random_uuid
+    build_ol_id.return_value = random_uuid
 
     job_facets = {**get_airflow_job_facet(dag_run)}
 
@@ -736,6 +736,7 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
     )
     adapter.dag_started(
         dag_id=dag_id,
+        run_id=run_id,
         start_date=event_time,
         logical_date=event_time,
         clear_number=0,
@@ -745,6 +746,7 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
         job_description=dag.description,
         job_description_type="text/plain",
         tags=["tag1", "tag2"],
+        is_asset_triggered=False,
         run_facets={
             "parent": parent_run.ParentRunFacet(
                 run=parent_run.Run(runId=random_uuid),
@@ -834,11 +836,11 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
 
 @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True)
 @mock.patch.object(DagRun, "fetch_task_instances")
[email protected]("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
[email protected]("airflow.providers.openlineage.plugins.adapter.build_dag_run_ol_run_id")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
 def test_emit_dag_complete_event(
-    mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_fetch_tis, 
mock_debug_mode
+    mock_stats_incr, mock_stats_timer, build_ol_id, mocked_fetch_tis, 
mock_debug_mode
 ):
     random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
     client = MagicMock()
@@ -896,7 +898,7 @@ def test_emit_dag_complete_event(
     ti2.end_date = datetime.datetime(2022, 1, 1, 0, 14, 0)
 
     mocked_fetch_tis.return_value = [ti0, ti1, ti2]
-    generate_static_uuid.return_value = random_uuid
+    build_ol_id.return_value = random_uuid
 
     adapter.dag_success(
         dag_id=dag_id,
@@ -912,6 +914,7 @@ def test_emit_dag_complete_event(
         job_description_type="text/plain",
         nominal_start_time=datetime.datetime(2022, 1, 1).isoformat(),
         nominal_end_time=datetime.datetime(2022, 1, 1).isoformat(),
+        is_asset_triggered=False,
         run_facets={
             "parent": parent_run.ParentRunFacet(
                 run=parent_run.Run(runId=random_uuid),
@@ -1000,11 +1003,11 @@ def test_emit_dag_complete_event(
 
 @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True)
 @mock.patch.object(DagRun, "fetch_task_instances")
[email protected]("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
[email protected]("airflow.providers.openlineage.plugins.adapter.build_dag_run_ol_run_id")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
 def test_emit_dag_failed_event(
-    mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_fetch_tis, 
mock_debug_mode
+    mock_stats_incr, mock_stats_timer, build_ol_id, mocked_fetch_tis, 
mock_debug_mode
 ):
     random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
     client = MagicMock()
@@ -1062,7 +1065,7 @@ def test_emit_dag_failed_event(
 
     mocked_fetch_tis.return_value = [ti0, ti1, ti2]
 
-    generate_static_uuid.return_value = random_uuid
+    build_ol_id.return_value = random_uuid
 
     adapter.dag_failed(
         dag_id=dag_id,
@@ -1090,6 +1093,7 @@ def test_emit_dag_failed_event(
             ),
             "airflowDagRun": AirflowDagRunFacet(dag={"description": "dag 
desc"}, dagRun=dag_run),
         },
+        is_asset_triggered=False,
     )
 
     client.emit.assert_called_once_with(
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py 
b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
index 6bbffd2cac2..cb3eaee8218 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
@@ -42,14 +42,19 @@ from airflow.providers.openlineage.utils.utils import (
     TaskInfo,
     TaskInfoComplete,
     TaskInstanceInfo,
+    _extract_ol_info_from_asset_event,
+    _get_ol_job_dependencies_from_asset_events,
     _get_openlineage_data_from_dagrun_conf,
     _get_task_groups_details,
     _get_tasks_details,
     _truncate_string_to_byte_size,
+    build_dag_run_ol_run_id,
+    build_task_instance_ol_run_id,
     get_airflow_dag_run_facet,
     get_airflow_job_facet,
     get_airflow_state_run_facet,
     get_dag_documentation,
+    get_dag_job_dependency_facet,
     get_dag_parent_run_facet,
     get_fully_qualified_class_name,
     get_job_name,
@@ -60,6 +65,8 @@ from airflow.providers.openlineage.utils.utils import (
     get_task_documentation,
     get_task_parent_run_facet,
     get_user_provided_run_facets,
+    is_dag_run_asset_triggered,
+    is_valid_uuid,
 )
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.timetables.events import EventsTimetable
@@ -168,6 +175,8 @@ def test_get_airflow_dag_run_facet():
     dagrun_mock.run_after = datetime.datetime(2024, 6, 1, 1, 2, 4, 
tzinfo=datetime.timezone.utc)
     dagrun_mock.start_date = datetime.datetime(2024, 6, 1, 1, 2, 4, 
tzinfo=datetime.timezone.utc)
     dagrun_mock.end_date = datetime.datetime(2024, 6, 1, 1, 2, 14, 34172, 
tzinfo=datetime.timezone.utc)
+    dagrun_mock.triggering_user_name = "user1"
+    dagrun_mock.triggered_by = "something"
     dagrun_mock.dag_versions = [
         MagicMock(
             bundle_name="bundle_name",
@@ -197,6 +206,7 @@ def test_get_airflow_dag_run_facet():
             dag=expected_dag_info,
             dagRun={
                 "conf": {},
+                "clear_number": 0,
                 "dag_id": "dag",
                 "data_interval_start": "2024-06-01T01:02:03+00:00",
                 "data_interval_end": "2024-06-01T02:03:04+00:00",
@@ -213,6 +223,8 @@ def test_get_airflow_dag_run_facet():
                 "dag_bundle_version": "bundle_version",
                 "dag_version_id": "version_id",
                 "dag_version_number": "version_number",
+                "triggering_user_name": "user1",
+                "triggered_by": "something",
             },
         )
     }
@@ -1908,7 +1920,7 @@ class TestDagInfoAirflow2:
         }
 
 
[email protected](AIRFLOW_V_3_0_PLUS, reason="Airflow < 3.0 tests")
[email protected](AIRFLOW_V_3_0_PLUS, reason="Airflow 2 tests")
 class TestDagInfoAirflow210:
     def test_dag_info_schedule_single_dataset_directly(self):
         dag = DAG(
@@ -2294,10 +2306,12 @@ def test_dagrun_info_af3(mocked_dag_versions):
     )
     assert dagrun.dag_versions == [dv1, dv2]
     dagrun.end_date = date + datetime.timedelta(seconds=74, microseconds=546)
+    dagrun.triggering_user_name = "my_user"
 
     result = DagRunInfo(dagrun)
     assert dict(result) == {
         "conf": {"a": 1},
+        "clear_number": 0,
         "dag_id": "dag_id",
         "data_interval_end": "2024-06-01T00:00:00+00:00",
         "data_interval_start": "2024-06-01T00:00:00+00:00",
@@ -2312,6 +2326,8 @@ def test_dagrun_info_af3(mocked_dag_versions):
         "dag_bundle_version": "bundle_version",
         "dag_version_id": "version_id",
         "dag_version_number": "version_number",
+        "triggered_by": DagRunTriggeredByType.UI,
+        "triggering_user_name": "my_user",
     }
 
 
@@ -2338,6 +2354,7 @@ def test_dagrun_info_af2():
     result = DagRunInfo(dagrun)
     assert dict(result) == {
         "conf": {"a": 1},
+        "clear_number": 0,
         "dag_id": "dag_id",
         "data_interval_end": "2024-06-01T00:00:00+00:00",
         "data_interval_start": "2024-06-01T00:00:00+00:00",
@@ -2824,3 +2841,708 @@ class TestGetAirflowStateRunFacet:
         )
 
         assert result["airflowState"].tasksDuration["terminated_task"] == 0.0
+
+
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Airflow 3 specific test")
+def test_is_dag_run_asset_triggered_af3():
+    """Test is_dag_run_asset_triggered for Airflow 3."""
+    from airflow.models.dagrun import DagRunTriggeredByType
+
+    dag_run = MagicMock(triggered_by=DagRunTriggeredByType.ASSET)
+
+    assert is_dag_run_asset_triggered(dag_run) is True
+
+    dag_run.triggered_by = DagRunTriggeredByType.TIMETABLE
+    assert is_dag_run_asset_triggered(dag_run) is False
+
+
[email protected](AIRFLOW_V_3_0_PLUS, reason="Airflow 2 specific test")
+def test_is_dag_run_asset_triggered_af2():
+    """Test is_dag_run_asset_triggered for Airflow 2."""
+    from airflow.models.dagrun import DagRunType
+
+    dag_run = MagicMock(run_type=DagRunType.DATASET_TRIGGERED)
+
+    assert is_dag_run_asset_triggered(dag_run) is True
+
+    dag_run.run_type = DagRunType.MANUAL
+    assert is_dag_run_asset_triggered(dag_run) is False
+
+
+def test_build_task_instance_ol_run_id():
+    """Test deterministic UUID generation for task instance."""
+    logical_date = datetime.datetime(2024, 1, 1, 12, 0, 0, 
tzinfo=datetime.timezone.utc)
+    run_id = build_task_instance_ol_run_id(
+        dag_id="test_dag",
+        task_id="test_task",
+        try_number=1,
+        logical_date=logical_date,
+        map_index=0,
+    )
+
+    assert run_id == "018cc4e5-2200-7b27-b511-a7a14aa0662a"
+
+    # Should be deterministic - same inputs produce same output
+    run_id2 = build_task_instance_ol_run_id(
+        dag_id="test_dag",
+        task_id="test_task",
+        try_number=1,
+        logical_date=logical_date,
+        map_index=0,
+    )
+    assert run_id == run_id2
+
+    # Different inputs should produce different outputs
+    run_id3 = build_task_instance_ol_run_id(
+        dag_id="test_dag",
+        task_id="test_task",
+        try_number=2,  # Different try_number
+        logical_date=logical_date,
+        map_index=0,
+    )
+    assert run_id != run_id3
+
+
+def test_build_dag_run_ol_run_id():
+    """Test deterministic UUID generation for DAG run."""
+    logical_date = datetime.datetime(2024, 1, 1, 12, 0, 0, 
tzinfo=datetime.timezone.utc)
+    run_id = build_dag_run_ol_run_id(
+        dag_id="test_dag",
+        logical_date=logical_date,
+        clear_number=0,
+    )
+    assert run_id == "018cc4e5-2200-725f-8091-596ad71712b2"
+
+    # Should be deterministic - same inputs produce same output
+    run_id2 = build_dag_run_ol_run_id(
+        dag_id="test_dag",
+        logical_date=logical_date,
+        clear_number=0,
+    )
+    assert run_id == run_id2
+
+    # Different inputs should produce different outputs
+    run_id3 = build_dag_run_ol_run_id(
+        dag_id="test_dag",
+        logical_date=logical_date,
+        clear_number=1,  # Different clear_number
+    )
+    assert run_id != run_id3
+
+
+def test_validate_uuid_valid():
+    """Test validation of valid UUID strings."""
+    valid_uuids = [
+        "550e8400-e29b-41d4-a716-446655440000",
+        "6ba7b810-9dad-11d1-80b4-00c04fd430c8",
+        "00000000-0000-0000-0000-000000000000",
+    ]
+    for uuid_str in valid_uuids:
+        assert is_valid_uuid(uuid_str) is True
+
+
+def test_validate_uuid_invalid():
+    """Test validation of invalid UUID strings."""
+    invalid_uuids = [
+        "not-a-uuid",
+        "550e8400-e29b-41d4-a716",  # Too short
+        "550e8400-e29b-41d4-a716-446655440000-extra",  # Too long
+        "550e8400-e29b-41d4-a716-44665544000g",  # Invalid character
+        "",
+        "123",
+        None,
+    ]
+    for uuid_str in invalid_uuids:
+        assert is_valid_uuid(uuid_str) is False
+
+
+class TestExtractOlInfoFromAssetEvent:
+    """Tests for _extract_ol_info_from_asset_event function."""
+
+    def test_extract_ol_info_from_task_instance(self):
+        """Test extraction from TaskInstance (priority 1)."""
+        logical_date = datetime.datetime(2024, 1, 1, 12, 0, 0, 
tzinfo=datetime.timezone.utc)
+
+        # Mock TaskInstance - using MagicMock without spec to avoid SQLAlchemy 
mapper inspection
+        ti = MagicMock()
+        ti.dag_id = "source_dag"
+        ti.task_id = "source_task"
+        ti.try_number = 1
+        ti.map_index = 0
+
+        # Mock DagRun
+        source_dr = MagicMock()
+        source_dr.logical_date = logical_date
+        source_dr.run_after = None
+
+        # Mock AssetEvent
+        asset_event = MagicMock()
+        asset_event.source_task_instance = ti
+        asset_event.source_dag_run = source_dr
+        asset_event.source_dag_id = None
+        asset_event.source_task_id = None
+        asset_event.extra = {}
+
+        result = _extract_ol_info_from_asset_event(asset_event)
+
+        expected_run_id = build_task_instance_ol_run_id(
+            dag_id="source_dag",
+            task_id="source_task",
+            try_number=1,
+            logical_date=logical_date,
+            map_index=0,
+        )
+        assert result == {
+            "job_name": "source_dag.source_task",
+            "job_namespace": namespace(),
+            "run_id": expected_run_id,
+        }
+
+    def test_extract_ol_info_from_task_instance_no_logical_date(self):
+        """Test extraction from TaskInstance without logical_date."""
+        # Mock TaskInstance
+        ti = MagicMock()
+        ti.dag_id = "source_dag"
+        ti.task_id = "source_task"
+        ti.try_number = 1
+        ti.map_index = 0
+
+        # Mock DagRun with None logical_date
+        source_dr = MagicMock()
+        source_dr.logical_date = None
+        source_dr.run_after = None
+
+        # Mock AssetEvent
+        asset_event = MagicMock()
+        asset_event.source_task_instance = ti
+        asset_event.source_dag_run = source_dr
+        asset_event.source_dag_id = None
+        asset_event.source_task_id = None
+        asset_event.extra = {}
+
+        result = _extract_ol_info_from_asset_event(asset_event)
+
+        # run_id should not be included if logical_date is None
+        assert result == {
+            "job_name": "source_dag.source_task",
+            "job_namespace": namespace(),
+        }
+
+    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Airflow 3 specific 
test")
+    def test_extract_ol_info_from_task_instance_run_after_fallback(self):
+        """Test extraction from TaskInstance with run_after fallback (AF3)."""
+        run_after = datetime.datetime(2024, 1, 1, 12, 0, 0, 
tzinfo=datetime.timezone.utc)
+
+        # Mock TaskInstance
+        ti = MagicMock()
+        ti.dag_id = "source_dag"
+        ti.task_id = "source_task"
+        ti.try_number = 1
+        ti.map_index = 0
+
+        # Mock DagRun with None logical_date but run_after set
+        source_dr = MagicMock()
+        source_dr.logical_date = None
+        source_dr.run_after = run_after
+
+        # Mock AssetEvent
+        asset_event = MagicMock()
+        asset_event.source_task_instance = ti
+        asset_event.source_dag_run = source_dr
+        asset_event.source_dag_id = None
+        asset_event.source_task_id = None
+        asset_event.extra = {}
+
+        result = _extract_ol_info_from_asset_event(asset_event)
+
+        # Should use run_after as fallback for logical_date
+        expected_run_id = build_task_instance_ol_run_id(
+            dag_id="source_dag",
+            task_id="source_task",
+            try_number=1,
+            logical_date=run_after,
+            map_index=0,
+        )
+        assert result == {
+            "job_name": "source_dag.source_task",
+            "job_namespace": namespace(),
+            "run_id": expected_run_id,
+        }
+
+    def test_extract_ol_info_from_source_fields(self):
+        """Test extraction from AssetEvent source fields (priority 2)."""
+        # Mock AssetEvent without TaskInstance
+        asset_event = MagicMock()
+        asset_event.source_task_instance = None
+        asset_event.source_dag_run = None
+        asset_event.source_dag_id = "source_dag"
+        asset_event.source_task_id = "source_task"
+        asset_event.extra = {}
+
+        result = _extract_ol_info_from_asset_event(asset_event)
+
+        # run_id cannot be constructed from source fields alone
+        assert result == {
+            "job_name": "source_dag.source_task",
+            "job_namespace": namespace(),
+        }
+
+    def test_extract_ol_info_from_extra(self):
+        """Test extraction from asset_event.extra (priority 3)."""
+        # Mock AssetEvent
+        asset_event = MagicMock()
+        asset_event.source_task_instance = None
+        asset_event.source_dag_run = None
+        asset_event.source_dag_id = None
+        asset_event.source_task_id = None
+        asset_event.extra = {
+            "openlineage": {
+                "parentJobName": "extra_job",
+                "parentJobNamespace": "extra_namespace",
+                "parentRunId": "550e8400-e29b-41d4-a716-446655440000",
+            }
+        }
+
+        result = _extract_ol_info_from_asset_event(asset_event)
+
+        assert result == {
+            "job_name": "extra_job",
+            "job_namespace": "extra_namespace",
+            "run_id": "550e8400-e29b-41d4-a716-446655440000",
+        }
+
+    def test_extract_ol_info_from_extra_no_run_id(self):
+        """Test extraction from asset_event.extra without run_id."""
+        # Mock AssetEvent
+        asset_event = MagicMock()
+        asset_event.source_task_instance = None
+        asset_event.source_dag_run = None
+        asset_event.source_dag_id = None
+        asset_event.source_task_id = None
+        asset_event.extra = {
+            "openlineage": {
+                "parentJobName": "extra_job",
+                "parentJobNamespace": "extra_namespace",
+            }
+        }
+
+        result = _extract_ol_info_from_asset_event(asset_event)
+
+        assert result == {
+            "job_name": "extra_job",
+            "job_namespace": "extra_namespace",
+        }
+
+    def test_extract_ol_info_from_extra_no_job_name(self):
+        """Test extraction from asset_event.extra without job_name."""
+        # Mock AssetEvent
+        asset_event = MagicMock()
+        asset_event.source_task_instance = None
+        asset_event.source_dag_run = None
+        asset_event.source_dag_id = None
+        asset_event.source_task_id = None
+        asset_event.extra = {
+            "openlineage": {
+                "parentRunId": "550e8400-e29b-41d4-a716-446655440000",
+                "parentJobNamespace": "extra_namespace",
+            }
+        }
+
+        result = _extract_ol_info_from_asset_event(asset_event)
+
+        assert result is None
+
+    def test_extract_ol_info_insufficient_info(self):
+        """Test extraction when no information is available."""
+        # Mock AssetEvent with no information
+        asset_event = MagicMock()
+        asset_event.source_task_instance = None
+        asset_event.source_dag_run = None
+        asset_event.source_dag_id = None
+        asset_event.source_task_id = None
+        asset_event.extra = {}
+
+        result = _extract_ol_info_from_asset_event(asset_event)
+
+        assert result is None
+
+
+class TestGetOlJobDependenciesFromAssetEvents:
+    """Tests for _get_ol_job_dependencies_from_asset_events function."""
+
+    def test_get_ol_job_dependencies_no_events(self):
+        """Test when no events are provided."""
+        result = _get_ol_job_dependencies_from_asset_events([])
+
+        assert result == []
+
+    
@patch("airflow.providers.openlineage.utils.utils._extract_ol_info_from_asset_event")
+    def test_get_ol_job_dependencies_with_events(self, mock_extract):
+        """Test extraction and deduplication of asset events."""
+        # Mock asset events
+        asset_event1 = MagicMock()
+        asset_event1.id = 1
+        asset_event1.source_run_id = "run1"
+        asset_event1.asset_id = 101
+        asset_event1.dataset_id = 101
+        asset_event1.uri = "s3://bucket/file1"
+        asset_event1.extra = {}
+        asset_event1.partition_key = None
+
+        asset_event2 = MagicMock()
+        asset_event2.id = 2
+        asset_event2.source_run_id = "run2"
+        asset_event2.asset_id = 102
+        asset_event2.dataset_id = 102
+        asset_event2.uri = "s3://bucket/file2"
+        asset_event2.extra = {}
+        asset_event2.partition_key = None
+
+        # Mock extraction results
+        mock_extract.side_effect = [
+            {
+                "job_name": "dag1.task1",
+                "job_namespace": "namespace",
+                "run_id": "550e8400-e29b-41d4-a716-446655440000",
+            },
+            {
+                "job_name": "dag2.task2",
+                "job_namespace": "namespace",
+                "run_id": "550e8400-e29b-41d4-a716-446655440001",
+            },
+        ]
+
+        result = _get_ol_job_dependencies_from_asset_events([asset_event1, 
asset_event2])
+
+        assert result == [
+            {
+                "job_name": "dag1.task1",
+                "job_namespace": "namespace",
+                "run_id": "550e8400-e29b-41d4-a716-446655440000",
+                "asset_events": [
+                    {
+                        "dag_run_id": "run1",
+                        "asset_event_id": 1,
+                        "asset_event_extra": None,
+                        "asset_id": 101,
+                        "asset_uri": "s3://bucket/file1",
+                        "partition_key": None,
+                    }
+                ],
+            },
+            {
+                "job_name": "dag2.task2",
+                "job_namespace": "namespace",
+                "run_id": "550e8400-e29b-41d4-a716-446655440001",
+                "asset_events": [
+                    {
+                        "dag_run_id": "run2",
+                        "asset_event_id": 2,
+                        "asset_event_extra": None,
+                        "asset_id": 102,
+                        "asset_uri": "s3://bucket/file2",
+                        "partition_key": None,
+                    }
+                ],
+            },
+        ]
+
+    
@patch("airflow.providers.openlineage.utils.utils._extract_ol_info_from_asset_event")
+    def test_get_ol_job_dependencies_deduplication(self, mock_extract):
+        """Test deduplication of duplicate asset events."""
+        # Mock asset events
+        asset_event1 = MagicMock()
+        asset_event1.id = 1
+        asset_event1.source_run_id = "run1"
+        asset_event1.asset_id = 101
+        asset_event1.dataset_id = 101
+        asset_event1.uri = "s3://bucket/file1"
+        asset_event1.extra = {}
+        asset_event1.partition_key = None
+
+        asset_event2 = MagicMock()
+        asset_event2.id = 2
+        asset_event2.source_run_id = "run2"
+        asset_event2.asset_id = 102
+        asset_event2.dataset_id = 102
+        asset_event2.uri = "s3://bucket/file2"
+        asset_event2.extra = {}
+        asset_event2.partition_key = None
+
+        # Mock extraction results - same job/run (should be deduplicated)
+        same_info = {
+            "job_name": "dag1.task1",
+            "job_namespace": "namespace",
+        }
+        mock_extract.side_effect = [same_info, same_info]
+
+        result = _get_ol_job_dependencies_from_asset_events([asset_event1, 
asset_event2])
+
+        # Should be deduplicated to one entry with both events aggregated
+        assert result == [
+            {
+                "job_name": "dag1.task1",
+                "job_namespace": "namespace",
+                "asset_events": [
+                    {
+                        "dag_run_id": "run1",
+                        "asset_event_id": 1,
+                        "asset_event_extra": None,
+                        "asset_id": 101,
+                        "asset_uri": "s3://bucket/file1",
+                        "partition_key": None,
+                    },
+                    {
+                        "dag_run_id": "run2",
+                        "asset_event_id": 2,
+                        "asset_event_extra": None,
+                        "asset_id": 102,
+                        "asset_uri": "s3://bucket/file2",
+                        "partition_key": None,
+                    },
+                ],
+            }
+        ]
+
+    
@patch("airflow.providers.openlineage.utils.utils._extract_ol_info_from_asset_event")
+    def test_get_ol_job_dependencies_insufficient_info(self, mock_extract):
+        """Test handling when extraction returns None."""
+        # Mock asset event
+        asset_event = MagicMock()
+        asset_event.id = 1
+
+        # Mock extraction returning None
+        mock_extract.return_value = None
+
+        result = _get_ol_job_dependencies_from_asset_events([asset_event])
+
+        assert result == []
+
+
+class TestGetDagJobDependencyFacet:
+    """Tests for get_dag_job_dependency_facet function.
+
+    These tests mock only the DB-accessing function 
(_get_eagerly_loaded_dagrun_consumed_asset_events)
+    to test the full flow of facet generation including event processing and 
facet building.
+    """
+
+    
@patch("airflow.providers.openlineage.utils.utils._get_eagerly_loaded_dagrun_consumed_asset_events")
+    def test_get_dag_job_dependency_facet_no_events(self, mock_get_events):
+        """Test when no asset events are found."""
+        mock_get_events.return_value = []
+
+        result = get_dag_job_dependency_facet("test_dag", "test_run_id")
+
+        assert result == {}
+        mock_get_events.assert_called_once_with("test_dag", "test_run_id")
+
+    
@patch("airflow.providers.openlineage.utils.utils._get_eagerly_loaded_dagrun_consumed_asset_events")
+    def test_get_dag_job_dependency_facet_exception_handling(self, 
mock_get_events):
+        """Test exception handling in get_dag_job_dependency_facet."""
+        mock_get_events.side_effect = Exception("Database error")
+
+        result = get_dag_job_dependency_facet("test_dag", "test_run_id")
+
+        assert result == {}
+
+    
@patch("airflow.providers.openlineage.utils.utils._get_eagerly_loaded_dagrun_consumed_asset_events")
+    def test_get_dag_job_dependency_facet_insufficient_info_skipped(self, 
mock_get_events):
+        """Test that events with insufficient info are skipped."""
+        # Create an event with no usable information
+        asset_event = MagicMock()
+        asset_event.source_task_instance = None
+        asset_event.source_dag_run = None
+        asset_event.source_dag_id = None
+        asset_event.source_task_id = None
+        asset_event.extra = {}
+        asset_event.id = 1
+        asset_event.source_run_id = None
+        asset_event.asset_id = 101
+        asset_event.dataset_id = 101
+        asset_event.uri = "s3://bucket/file"
+        asset_event.partition_key = None
+
+        mock_get_events.return_value = [asset_event]
+
+        result = get_dag_job_dependency_facet("test_dag", "test_run_id")
+
+        assert result == {}
+
+    
@patch("airflow.providers.openlineage.utils.utils._get_eagerly_loaded_dagrun_consumed_asset_events")
+    def test_get_dag_job_dependency_facet_with_events(self, mock_get_events):
+        """Test facet generation with asset events - tests full flow."""
+        logical_date = datetime.datetime(2024, 1, 1, 12, 0, 0, 
tzinfo=datetime.timezone.utc)
+
+        # Create mock asset events with source TaskInstance (priority 1 source)
+        ti1 = MagicMock()
+        ti1.dag_id = "source_dag1"
+        ti1.task_id = "source_task1"
+        ti1.try_number = 1
+        ti1.map_index = 0
+
+        source_dr1 = MagicMock()
+        source_dr1.logical_date = logical_date
+        source_dr1.run_after = None
+
+        asset_event1 = MagicMock()
+        asset_event1.source_task_instance = ti1
+        asset_event1.source_dag_run = source_dr1
+        asset_event1.source_dag_id = None
+        asset_event1.source_task_id = None
+        asset_event1.extra = {}
+        asset_event1.id = 1
+        asset_event1.source_run_id = "run1"
+        asset_event1.asset_id = 101
+        asset_event1.dataset_id = 101
+        asset_event1.uri = "s3://bucket/file1"
+        asset_event1.partition_key = None
+
+        # Second event with source fields (priority 2 source, no run_id)
+        asset_event2 = MagicMock()
+        asset_event2.source_task_instance = None
+        asset_event2.source_dag_run = None
+        asset_event2.source_dag_id = "source_dag2"
+        asset_event2.source_task_id = "source_task2"
+        asset_event2.extra = {}
+        asset_event2.id = 2
+        asset_event2.source_run_id = "run2"
+        asset_event2.asset_id = 102
+        asset_event2.dataset_id = 102
+        asset_event2.uri = "s3://bucket/file2"
+        asset_event2.partition_key = None
+
+        mock_get_events.return_value = [asset_event1, asset_event2]
+
+        result = get_dag_job_dependency_facet("test_dag", "test_run_id")
+
+        # Verify result structure
+        assert len(result) == 1
+        facet = result["jobDependencies"]
+        assert len(facet.upstream) == 2
+        assert len(facet.downstream) == 0
+
+        # Verify first dependency (from TaskInstance source, has run_id)
+        dep1 = facet.upstream[0]
+        assert dep1.job.namespace == namespace()
+        assert dep1.job.name == "source_dag1.source_task1"
+        expected_run_id = build_task_instance_ol_run_id(
+            dag_id="source_dag1",
+            task_id="source_task1",
+            try_number=1,
+            logical_date=logical_date,
+            map_index=0,
+        )
+        assert dep1.run.runId == expected_run_id
+        assert dep1.dependency_type == "IMPLICIT_ASSET_DEPENDENCY"
+        assert dep1.airflow["asset_events"] == [
+            {
+                "dag_run_id": "run1",
+                "asset_event_id": 1,
+                "asset_event_extra": None,
+                "asset_id": 101,
+                "asset_uri": "s3://bucket/file1",
+                "partition_key": None,
+            }
+        ]
+
+        # Verify second dependency (from source fields, no run_id)
+        dep2 = facet.upstream[1]
+        assert dep2.job.namespace == namespace()
+        assert dep2.job.name == "source_dag2.source_task2"
+        assert dep2.run is None
+        assert dep2.dependency_type == "IMPLICIT_ASSET_DEPENDENCY"
+        assert dep2.airflow["asset_events"] == [
+            {
+                "dag_run_id": "run2",
+                "asset_event_id": 2,
+                "asset_event_extra": None,
+                "asset_id": 102,
+                "asset_uri": "s3://bucket/file2",
+                "partition_key": None,
+            }
+        ]
+
+    
@patch("airflow.providers.openlineage.utils.utils._get_eagerly_loaded_dagrun_consumed_asset_events")
+    def test_get_dag_job_dependency_facet_deduplication(self, mock_get_events):
+        """Test that duplicate asset events from same job/run are 
deduplicated."""
+        logical_date = datetime.datetime(2024, 1, 1, 12, 0, 0, 
tzinfo=datetime.timezone.utc)
+
+        # Create two events from the same source TI (should be deduplicated)
+        ti = MagicMock()
+        ti.dag_id = "source_dag"
+        ti.task_id = "source_task"
+        ti.try_number = 1
+        ti.map_index = 0
+
+        source_dr = MagicMock()
+        source_dr.logical_date = logical_date
+        source_dr.run_after = None
+
+        asset_event1 = MagicMock()
+        asset_event1.source_task_instance = ti
+        asset_event1.source_dag_run = source_dr
+        asset_event1.source_dag_id = None
+        asset_event1.source_task_id = None
+        asset_event1.extra = {}
+        asset_event1.id = 1
+        asset_event1.source_run_id = "run1"
+        asset_event1.asset_id = 101
+        asset_event1.dataset_id = 101
+        asset_event1.uri = "s3://bucket/file1"
+        asset_event1.partition_key = None
+
+        asset_event2 = MagicMock()
+        asset_event2.source_task_instance = ti  # Same TI
+        asset_event2.source_dag_run = source_dr  # Same DR
+        asset_event2.source_dag_id = None
+        asset_event2.source_task_id = None
+        asset_event2.extra = {}
+        asset_event2.id = 2
+        asset_event2.source_run_id = "run1"
+        asset_event2.asset_id = 102  # Different asset
+        asset_event2.dataset_id = 102  # Different asset
+        asset_event2.uri = "s3://bucket/file2"
+        asset_event2.partition_key = None
+
+        mock_get_events.return_value = [asset_event1, asset_event2]
+
+        result = get_dag_job_dependency_facet("test_dag", "test_run_id")
+
+        assert len(result) == 1
+        facet = result["jobDependencies"]
+        assert len(facet.upstream) == 1
+        assert len(facet.downstream) == 0
+
+        # Verify the single deduplicated dependency
+        dep = facet.upstream[0]
+        assert dep.job.namespace == namespace()
+        assert dep.job.name == "source_dag.source_task"
+        expected_run_id = build_task_instance_ol_run_id(
+            dag_id="source_dag",
+            task_id="source_task",
+            try_number=1,
+            logical_date=logical_date,
+            map_index=0,
+        )
+        assert dep.run.runId == expected_run_id
+        assert dep.dependency_type == "IMPLICIT_ASSET_DEPENDENCY"
+
+        # Both asset events should be aggregated into single dependency
+        assert dep.airflow["asset_events"] == [
+            {
+                "dag_run_id": "run1",
+                "asset_event_id": 1,
+                "asset_event_extra": None,
+                "asset_id": 101,
+                "asset_uri": "s3://bucket/file1",
+                "partition_key": None,
+            },
+            {
+                "dag_run_id": "run1",
+                "asset_event_id": 2,
+                "asset_event_extra": None,
+                "asset_id": 102,
+                "asset_uri": "s3://bucket/file2",
+                "partition_key": None,
+            },
+        ]

Reply via email to