This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 95a83102e8 feat: Add dag_id when generating OpenLineage run_id for 
task instance. (#36659)
95a83102e8 is described below

commit 95a83102e8753c2f8caf5b0d5c847f4c7f254f67
Author: Kacper Muda <[email protected]>
AuthorDate: Tue Jan 9 13:15:07 2024 +0100

    feat: Add dag_id when generating OpenLineage run_id for task instance. 
(#36659)
---
 airflow/providers/dbt/cloud/utils/openlineage.py   |  5 +-
 airflow/providers/openlineage/plugins/adapter.py   |  4 +-
 airflow/providers/openlineage/plugins/listener.py  | 27 ++++++----
 airflow/providers/openlineage/plugins/macros.py    | 10 +++-
 tests/always/test_project_structure.py             |  1 -
 .../providers/openlineage/plugins/test_listener.py | 57 +++++++++++++++++-----
 tests/providers/openlineage/plugins/test_macros.py | 52 ++++++++++++++++++++
 .../plugins/test_openlineage_adapter.py            | 44 ++++++++---------
 8 files changed, 149 insertions(+), 51 deletions(-)

diff --git a/airflow/providers/dbt/cloud/utils/openlineage.py 
b/airflow/providers/dbt/cloud/utils/openlineage.py
index 6a0934d412..f86c77a689 100644
--- a/airflow/providers/dbt/cloud/utils/openlineage.py
+++ b/airflow/providers/dbt/cloud/utils/openlineage.py
@@ -121,7 +121,10 @@ def generate_openlineage_events_from_dbt_cloud_run(
 
         # generate same run id of current task instance
         parent_run_id = OpenLineageAdapter.build_task_instance_run_id(
-            operator.task_id, task_instance.execution_date, 
task_instance.try_number - 1
+            dag_id=task_instance.dag_id,
+            task_id=operator.task_id,
+            execution_date=task_instance.execution_date,
+            try_number=task_instance.try_number - 1,
         )
 
         parent_job = ParentRunMetadata(
diff --git a/airflow/providers/openlineage/plugins/adapter.py 
b/airflow/providers/openlineage/plugins/adapter.py
index ad648f8828..6f16b01706 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -102,11 +102,11 @@ class OpenLineageAdapter(LoggingMixin):
         return str(uuid.uuid3(uuid.NAMESPACE_URL, 
f"{_DAG_NAMESPACE}.{dag_id}.{dag_run_id}"))
 
     @staticmethod
-    def build_task_instance_run_id(task_id, execution_date, try_number):
+    def build_task_instance_run_id(dag_id, task_id, execution_date, 
try_number):
         return str(
             uuid.uuid3(
                 uuid.NAMESPACE_URL,
-                f"{_DAG_NAMESPACE}.{task_id}.{execution_date}.{try_number}",
+                
f"{_DAG_NAMESPACE}.{dag_id}.{task_id}.{execution_date}.{try_number}",
             )
         )
 
diff --git a/airflow/providers/openlineage/plugins/listener.py 
b/airflow/providers/openlineage/plugins/listener.py
index 6fa02b9fc1..8c731dd6ff 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -77,7 +77,10 @@ class OpenLineageListener:
             parent_run_id = self.adapter.build_dag_run_id(dag.dag_id, 
dagrun.run_id)
 
             task_uuid = self.adapter.build_task_instance_run_id(
-                task.task_id, task_instance.execution_date, 
task_instance.try_number
+                dag_id=dag.dag_id,
+                task_id=task.task_id,
+                execution_date=task_instance.execution_date,
+                try_number=task_instance.try_number,
             )
 
             task_metadata = self.extractor_manager.extract_metadata(dagrun, 
task)
@@ -116,14 +119,17 @@ class OpenLineageListener:
         task = task_instance.task
         dag = task.dag
 
-        task_uuid = OpenLineageAdapter.build_task_instance_run_id(
-            task.task_id, task_instance.execution_date, 
task_instance.try_number - 1
-        )
-
         @print_warning(self.log)
         def on_success():
             parent_run_id = OpenLineageAdapter.build_dag_run_id(dag.dag_id, 
dagrun.run_id)
 
+            task_uuid = OpenLineageAdapter.build_task_instance_run_id(
+                dag_id=dag.dag_id,
+                task_id=task.task_id,
+                execution_date=task_instance.execution_date,
+                try_number=task_instance.try_number - 1,
+            )
+
             task_metadata = self.extractor_manager.extract_metadata(
                 dagrun, task, complete=True, task_instance=task_instance
             )
@@ -149,14 +155,17 @@ class OpenLineageListener:
         task = task_instance.task
         dag = task.dag
 
-        task_uuid = OpenLineageAdapter.build_task_instance_run_id(
-            task.task_id, task_instance.execution_date, 
task_instance.try_number
-        )
-
         @print_warning(self.log)
         def on_failure():
             parent_run_id = OpenLineageAdapter.build_dag_run_id(dag.dag_id, 
dagrun.run_id)
 
+            task_uuid = OpenLineageAdapter.build_task_instance_run_id(
+                dag_id=dag.dag_id,
+                task_id=task.task_id,
+                execution_date=task_instance.execution_date,
+                try_number=task_instance.try_number,
+            )
+
             task_metadata = self.extractor_manager.extract_metadata(
                 dagrun, task, complete=True, task_instance=task_instance
             )
diff --git a/airflow/providers/openlineage/plugins/macros.py 
b/airflow/providers/openlineage/plugins/macros.py
index 61af81a1eb..a4039db2f4 100644
--- a/airflow/providers/openlineage/plugins/macros.py
+++ b/airflow/providers/openlineage/plugins/macros.py
@@ -39,7 +39,10 @@ def lineage_run_id(task_instance: TaskInstance):
         :ref:`howto/macros:openlineage`
     """
     return OpenLineageAdapter.build_task_instance_run_id(
-        task_instance.task.task_id, task_instance.execution_date, 
task_instance.try_number
+        dag_id=task_instance.dag_id,
+        task_id=task_instance.task.task_id,
+        execution_date=task_instance.execution_date,
+        try_number=task_instance.try_number,
     )
 
 
@@ -55,6 +58,9 @@ def lineage_parent_id(run_id: str, task_instance: 
TaskInstance):
         :ref:`howto/macros:openlineage`
     """
     job_name = OpenLineageAdapter.build_task_instance_run_id(
-        task_instance.task.task_id, task_instance.execution_date, 
task_instance.try_number
+        dag_id=task_instance.dag_id,
+        task_id=task_instance.task.task_id,
+        execution_date=task_instance.execution_date,
+        try_number=task_instance.try_number,
     )
     return f"{_JOB_NAMESPACE}/{job_name}/{run_id}"
diff --git a/tests/always/test_project_structure.py 
b/tests/always/test_project_structure.py
index c88387919e..ca59b70c6f 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -163,7 +163,6 @@ class TestProjectStructure:
             "tests/providers/openlineage/extractors/test_manager.py",
             "tests/providers/openlineage/plugins/test_adapter.py",
             "tests/providers/openlineage/plugins/test_facets.py",
-            "tests/providers/openlineage/plugins/test_macros.py",
             "tests/providers/openlineage/test_sqlparser.py",
             "tests/providers/redis/operators/test_redis_publish.py",
             "tests/providers/redis/sensors/test_redis_key.py",
diff --git a/tests/providers/openlineage/plugins/test_listener.py 
b/tests/providers/openlineage/plugins/test_listener.py
index c616f77f97..827c17c9f7 100644
--- a/tests/providers/openlineage/plugins/test_listener.py
+++ b/tests/providers/openlineage/plugins/test_listener.py
@@ -151,6 +151,10 @@ def _create_listener_and_task_instance() -> 
tuple[OpenLineageListener, TaskInsta
         listener, task_instance = _create_listener_and_task_instance()
         # Now you can use listener and task_instance in your tests to simulate 
their interaction.
     """
+
+    def mock_task_id(dag_id, task_id, execution_date, try_number):
+        return f"{dag_id}.{task_id}.{execution_date}.{try_number}"
+
     listener = OpenLineageListener()
     listener.log = mock.Mock()
     listener.extractor_manager = mock.Mock()
@@ -161,7 +165,7 @@ def _create_listener_and_task_instance() -> 
tuple[OpenLineageListener, TaskInsta
 
     adapter = mock.Mock()
     adapter.build_dag_run_id.side_effect = lambda x, y: f"{x}.{y}"
-    adapter.build_task_instance_run_id.side_effect = lambda x, y, z: 
f"{x}.{y}.{z}"
+    adapter.build_task_instance_run_id.side_effect = mock_task_id
     adapter.start_task = mock.Mock()
     adapter.fail_task = mock.Mock()
     adapter.complete_task = mock.Mock()
@@ -211,7 +215,7 @@ def test_adapter_start_task_is_called_with_proper_arguments(
 
     listener.on_task_instance_running(None, task_instance, None)
     listener.adapter.start_task.assert_called_once_with(
-        run_id="task_id.execution_date.1",
+        run_id="dag_id.task_id.execution_date.1",
         job_name="job_name",
         job_description="Test DAG Description",
         event_time="2023-01-01T13:01:01",
@@ -239,9 +243,13 @@ def 
test_adapter_fail_task_is_called_with_proper_arguments(mock_get_job_name, mo
     the test verifies the integrity and consistency of the data passed to the 
adapter during task
     failure events, thus confirming that the adapter's failure handling is 
functioning as expected.
     """
+
+    def mock_task_id(dag_id, task_id, execution_date, try_number):
+        return f"{dag_id}.{task_id}.{execution_date}.{try_number}"
+
     listener, task_instance = _create_listener_and_task_instance()
     mock_get_job_name.return_value = "job_name"
-    mocked_adapter.build_task_instance_run_id.side_effect = lambda x, y, z: 
f"{x}.{y}.{z}"
+    mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
     mocked_adapter.build_dag_run_id.side_effect = lambda x, y: f"{x}.{y}"
 
     listener.on_task_instance_failed(None, task_instance, None)
@@ -250,7 +258,7 @@ def 
test_adapter_fail_task_is_called_with_proper_arguments(mock_get_job_name, mo
         job_name="job_name",
         parent_job_name="dag_id",
         parent_run_id="dag_id.dag_run_run_id",
-        run_id="task_id.execution_date.1",
+        run_id="dag_id.task_id.execution_date.1",
         task=listener.extractor_manager.extract_metadata(),
     )
 
@@ -266,9 +274,13 @@ def 
test_adapter_complete_task_is_called_with_proper_arguments(mock_get_job_name
     accordingly. This helps confirm the consistency and correctness of the 
data passed to the adapter
     during the task's lifecycle events.
     """
+
+    def mock_task_id(dag_id, task_id, execution_date, try_number):
+        return f"{dag_id}.{task_id}.{execution_date}.{try_number}"
+
     listener, task_instance = _create_listener_and_task_instance()
     mock_get_job_name.return_value = "job_name"
-    mocked_adapter.build_task_instance_run_id.side_effect = lambda x, y, z: 
f"{x}.{y}.{z}"
+    mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
     mocked_adapter.build_dag_run_id.side_effect = lambda x, y: f"{x}.{y}"
 
     listener.on_task_instance_success(None, task_instance, None)
@@ -279,7 +291,7 @@ def 
test_adapter_complete_task_is_called_with_proper_arguments(mock_get_job_name
         job_name="job_name",
         parent_job_name="dag_id",
         parent_run_id="dag_id.dag_run_run_id",
-        run_id="task_id.execution_date.0",
+        run_id="dag_id.task_id.execution_date.0",
         task=listener.extractor_manager.extract_metadata(),
     )
 
@@ -292,7 +304,7 @@ def 
test_adapter_complete_task_is_called_with_proper_arguments(mock_get_job_name
         job_name="job_name",
         parent_job_name="dag_id",
         parent_run_id="dag_id.dag_run_run_id",
-        run_id="task_id.execution_date.1",
+        run_id="dag_id.task_id.execution_date.1",
         task=listener.extractor_manager.extract_metadata(),
     )
 
@@ -305,12 +317,16 @@ def 
test_run_id_is_constant_across_all_methods(mocked_adapter):
     reflecting the task's identity and execution context. The test also 
simulates the change in the
     try_number attribute, as it would occur in Airflow, to verify that the 
run_id updates accordingly.
     """
+
+    def mock_task_id(dag_id, task_id, execution_date, try_number):
+        return f"{dag_id}.{task_id}.{execution_date}.{try_number}"
+
     listener, task_instance = _create_listener_and_task_instance()
-    mocked_adapter.build_task_instance_run_id.side_effect = lambda x, y, z: 
f"{x}.{y}.{z}"
+    mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
 
     listener.on_task_instance_running(None, task_instance, None)
     expected_run_id = listener.adapter.start_task.call_args.kwargs["run_id"]
-    assert expected_run_id == "task_id.execution_date.1"
+    assert expected_run_id == "dag_id.task_id.execution_date.1"
 
     listener.on_task_instance_failed(None, task_instance, None)
     assert listener.adapter.fail_task.call_args.kwargs["run_id"] == 
expected_run_id
@@ -318,7 +334,7 @@ def 
test_run_id_is_constant_across_all_methods(mocked_adapter):
     # This run_id will be different as we did NOT simulate increase of the 
try_number attribute,
     # which happens in Airflow.
     listener.on_task_instance_success(None, task_instance, None)
-    assert listener.adapter.complete_task.call_args.kwargs["run_id"] == 
"task_id.execution_date.0"
+    assert listener.adapter.complete_task.call_args.kwargs["run_id"] == 
"dag_id.task_id.execution_date.0"
 
     # Now we simulate the increase of try_number, and the run_id should 
reflect that change.
     # This is how airflow works, and that's why we expect the run_id to remain 
constant across all methods.
@@ -336,7 +352,12 @@ def 
test_running_task_correctly_calls_openlineage_adapter_run_id_method():
     """
     listener, task_instance = _create_listener_and_task_instance()
     listener.on_task_instance_running(None, task_instance, None)
-    
listener.adapter.build_task_instance_run_id.assert_called_once_with("task_id", 
"execution_date", 1)
+    listener.adapter.build_task_instance_run_id.assert_called_once_with(
+        dag_id="dag_id",
+        task_id="task_id",
+        execution_date="execution_date",
+        try_number=1,
+    )
 
 
 
@mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageAdapter")
@@ -349,7 +370,12 @@ def 
test_failed_task_correctly_calls_openlineage_adapter_run_id_method(mock_adap
     """
     listener, task_instance = _create_listener_and_task_instance()
     listener.on_task_instance_failed(None, task_instance, None)
-    mock_adapter.build_task_instance_run_id.assert_called_with("task_id", 
"execution_date", 1)
+    mock_adapter.build_task_instance_run_id.assert_called_once_with(
+        dag_id="dag_id",
+        task_id="task_id",
+        execution_date="execution_date",
+        try_number=1,
+    )
 
 
 
@mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageAdapter")
@@ -362,7 +388,12 @@ def 
test_successful_task_correctly_calls_openlineage_adapter_run_id_method(mock_
     """
     listener, task_instance = _create_listener_and_task_instance()
     listener.on_task_instance_success(None, task_instance, None)
-    mock_adapter.build_task_instance_run_id.assert_called_with("task_id", 
"execution_date", 0)
+    mock_adapter.build_task_instance_run_id.assert_called_once_with(
+        dag_id="dag_id",
+        task_id="task_id",
+        execution_date="execution_date",
+        try_number=0,
+    )
 
 
 @mock.patch("airflow.models.taskinstance.get_listener_manager")
diff --git a/tests/providers/openlineage/plugins/test_macros.py 
b/tests/providers/openlineage/plugins/test_macros.py
new file mode 100644
index 0000000000..bea73628c1
--- /dev/null
+++ b/tests/providers/openlineage/plugins/test_macros.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import uuid
+from unittest import mock
+
+from airflow.providers.openlineage.plugins.adapter import _DAG_NAMESPACE
+from airflow.providers.openlineage.plugins.macros import lineage_parent_id, 
lineage_run_id
+
+
+def test_lineage_run_id():
+    task = mock.MagicMock(
+        dag_id="dag_id", execution_date="execution_date", try_number=1, 
task=mock.MagicMock(task_id="task_id")
+    )
+    actual = lineage_run_id(task)
+    expected = str(
+        uuid.uuid3(
+            uuid.NAMESPACE_URL,
+            f"{_DAG_NAMESPACE}.dag_id.task_id.execution_date.1",
+        )
+    )
+    assert actual == expected
+
+
+def test_lineage_parent_id():
+    task = mock.MagicMock(
+        dag_id="dag_id", execution_date="execution_date", try_number=1, 
task=mock.MagicMock(task_id="task_id")
+    )
+    actual = lineage_parent_id(run_id="run_id", task_instance=task)
+    job_name = str(
+        uuid.uuid3(
+            uuid.NAMESPACE_URL,
+            f"{_DAG_NAMESPACE}.dag_id.task_id.execution_date.1",
+        )
+    )
+    expected = f"{_DAG_NAMESPACE}/{job_name}/run_id"
+    assert actual == expected
diff --git a/tests/providers/openlineage/plugins/test_openlineage_adapter.py 
b/tests/providers/openlineage/plugins/test_openlineage_adapter.py
index 70d374b07e..cdec3de941 100644
--- a/tests/providers/openlineage/plugins/test_openlineage_adapter.py
+++ b/tests/providers/openlineage/plugins/test_openlineage_adapter.py
@@ -161,7 +161,7 @@ def test_emit_start_event(mock_stats_incr, 
mock_stats_timer):
                     },
                 ),
                 job=Job(
-                    namespace="default",
+                    namespace=_DAG_NAMESPACE,
                     name="job",
                     facets={"documentation": 
DocumentationJobFacet(description="description")},
                 ),
@@ -222,18 +222,18 @@ def 
test_emit_start_event_with_additional_information(mock_stats_incr, mock_stat
                         ),
                         "parent": ParentRunFacet(
                             run={"runId": "parent_run_id"},
-                            job={"namespace": "default", "name": 
"parent_job_name"},
+                            job={"namespace": _DAG_NAMESPACE, "name": 
"parent_job_name"},
                         ),
                         "parentRun": ParentRunFacet(
                             run={"runId": "parent_run_id"},
-                            job={"namespace": "default", "name": 
"parent_job_name"},
+                            job={"namespace": _DAG_NAMESPACE, "name": 
"parent_job_name"},
                         ),
                         "externalQuery1": 
ExternalQueryRunFacet(externalQueryId="123", source="source"),
                         "externalQuery2": 
ExternalQueryRunFacet(externalQueryId="999", source="source"),
                     },
                 ),
                 job=Job(
-                    namespace="default",
+                    namespace=_DAG_NAMESPACE,
                     name="job",
                     facets={
                         "documentation": 
DocumentationJobFacet(description="description"),
@@ -284,7 +284,7 @@ def test_emit_complete_event(mock_stats_incr, 
mock_stats_timer):
                 eventType=RunState.COMPLETE,
                 eventTime=event_time,
                 run=Run(runId=run_id, facets={}),
-                job=Job(namespace="default", name="job", facets={}),
+                job=Job(namespace=_DAG_NAMESPACE, name="job", facets={}),
                 producer=_PRODUCER,
                 inputs=[],
                 outputs=[],
@@ -329,16 +329,16 @@ def 
test_emit_complete_event_with_additional_information(mock_stats_incr, mock_s
                     facets={
                         "parent": ParentRunFacet(
                             run={"runId": "parent_run_id"},
-                            job={"namespace": "default", "name": 
"parent_job_name"},
+                            job={"namespace": _DAG_NAMESPACE, "name": 
"parent_job_name"},
                         ),
                         "parentRun": ParentRunFacet(
                             run={"runId": "parent_run_id"},
-                            job={"namespace": "default", "name": 
"parent_job_name"},
+                            job={"namespace": _DAG_NAMESPACE, "name": 
"parent_job_name"},
                         ),
                         "externalQuery": 
ExternalQueryRunFacet(externalQueryId="123", source="source"),
                     },
                 ),
-                job=Job(namespace="default", name="job", facets={"sql": 
SqlJobFacet(query="SELECT 1;")}),
+                job=Job(namespace=_DAG_NAMESPACE, name="job", facets={"sql": 
SqlJobFacet(query="SELECT 1;")}),
                 producer=_PRODUCER,
                 inputs=[
                     Dataset(namespace="bigquery", name="a.b.c"),
@@ -377,7 +377,7 @@ def test_emit_failed_event(mock_stats_incr, 
mock_stats_timer):
                 eventType=RunState.FAIL,
                 eventTime=event_time,
                 run=Run(runId=run_id, facets={}),
-                job=Job(namespace="default", name="job", facets={}),
+                job=Job(namespace=_DAG_NAMESPACE, name="job", facets={}),
                 producer=_PRODUCER,
                 inputs=[],
                 outputs=[],
@@ -422,16 +422,16 @@ def 
test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta
                     facets={
                         "parent": ParentRunFacet(
                             run={"runId": "parent_run_id"},
-                            job={"namespace": "default", "name": 
"parent_job_name"},
+                            job={"namespace": _DAG_NAMESPACE, "name": 
"parent_job_name"},
                         ),
                         "parentRun": ParentRunFacet(
                             run={"runId": "parent_run_id"},
-                            job={"namespace": "default", "name": 
"parent_job_name"},
+                            job={"namespace": _DAG_NAMESPACE, "name": 
"parent_job_name"},
                         ),
                         "externalQuery": 
ExternalQueryRunFacet(externalQueryId="123", source="source"),
                     },
                 ),
-                job=Job(namespace="default", name="job", facets={"sql": 
SqlJobFacet(query="SELECT 1;")}),
+                job=Job(namespace=_DAG_NAMESPACE, name="job", facets={"sql": 
SqlJobFacet(query="SELECT 1;")}),
                 producer=_PRODUCER,
                 inputs=[
                     Dataset(namespace="bigquery", name="a.b.c"),
@@ -485,7 +485,7 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, uuid):
                         )
                     },
                 ),
-                job=Job(namespace="default", name="dag_id", facets={}),
+                job=Job(namespace=_DAG_NAMESPACE, name="dag_id", facets={}),
                 producer=_PRODUCER,
                 inputs=[],
                 outputs=[],
@@ -527,7 +527,7 @@ def test_emit_dag_complete_event(mock_stats_incr, 
mock_stats_timer, uuid):
                 eventType=RunState.COMPLETE,
                 eventTime=event_time.isoformat(),
                 run=Run(runId=random_uuid, facets={}),
-                job=Job(namespace="default", name="dag_id", facets={}),
+                job=Job(namespace=_DAG_NAMESPACE, name="dag_id", facets={}),
                 producer=_PRODUCER,
                 inputs=[],
                 outputs=[],
@@ -576,7 +576,7 @@ def test_emit_dag_failed_event(mock_stats_incr, 
mock_stats_timer, uuid):
                         )
                     },
                 ),
-                job=Job(namespace="default", name="dag_id", facets={}),
+                job=Job(namespace=_DAG_NAMESPACE, name="dag_id", facets={}),
                 producer=_PRODUCER,
                 inputs=[],
                 outputs=[],
@@ -627,25 +627,23 @@ def 
test_build_dag_run_id_uses_correct_methods_underneath():
 
 
 def test_build_task_instance_run_id_is_valid_uuid():
-    task_id = "task_1"
-    execution_date = "2023-01-01"
-    try_number = 1
-    result = OpenLineageAdapter.build_task_instance_run_id(task_id, 
execution_date, try_number)
+    result = OpenLineageAdapter.build_task_instance_run_id("dag_1", "task_1", 
"2023-01-01", 1)
     assert uuid.UUID(result)
 
 
 def test_build_task_instance_run_id_different_inputs_gives_different_results():
-    result1 = OpenLineageAdapter.build_task_instance_run_id("task1", 
"2023-01-01", 1)
-    result2 = OpenLineageAdapter.build_task_instance_run_id("task2", 
"2023-01-02", 2)
+    result1 = OpenLineageAdapter.build_task_instance_run_id("dag_1", "task1", 
"2023-01-01", 1)
+    result2 = OpenLineageAdapter.build_task_instance_run_id("dag_1", "task2", 
"2023-01-02", 2)
     assert result1 != result2
 
 
 def test_build_task_instance_run_id_uses_correct_methods_underneath():
+    dag_id = "dag_1"
     task_id = "task_1"
     execution_date = "2023-01-01"
     try_number = 1
     expected = str(
-        uuid.uuid3(uuid.NAMESPACE_URL, 
f"{_DAG_NAMESPACE}.{task_id}.{execution_date}.{try_number}")
+        uuid.uuid3(uuid.NAMESPACE_URL, 
f"{_DAG_NAMESPACE}.{dag_id}.{task_id}.{execution_date}.{try_number}")
     )
-    actual = OpenLineageAdapter.build_task_instance_run_id(task_id, 
execution_date, try_number)
+    actual = OpenLineageAdapter.build_task_instance_run_id(dag_id, task_id, 
execution_date, try_number)
     assert actual == expected

Reply via email to