This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 22305477bb Use UUIDv7 for OpenLineage runIds (#39889)
22305477bb is described below

commit 22305477bb056cb7a77af59f4dc906ff8a20583d
Author: Maxim Martynov <[email protected]>
AuthorDate: Tue May 28 14:09:05 2024 +0300

    Use UUIDv7 for OpenLineage runIds (#39889)
---
 airflow/providers/openlineage/plugins/adapter.py   |  43 ++++++--
 airflow/providers/openlineage/plugins/listener.py  |  21 ++--
 airflow/providers/openlineage/plugins/macros.py    |   2 +-
 airflow/providers/openlineage/provider.yaml        |   4 +-
 generated/provider_dependencies.json               |   4 +-
 .../providers/openlineage/plugins/test_adapter.py  | 110 ++++++++++++++-------
 .../providers/openlineage/plugins/test_listener.py |  53 +++++-----
 tests/providers/openlineage/plugins/test_macros.py |  25 ++---
 8 files changed, 170 insertions(+), 92 deletions(-)

diff --git a/airflow/providers/openlineage/plugins/adapter.py 
b/airflow/providers/openlineage/plugins/adapter.py
index e449668ef3..5a5b8ed34b 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -17,7 +17,6 @@
 from __future__ import annotations
 
 import traceback
-import uuid
 from contextlib import ExitStack
 from typing import TYPE_CHECKING
 
@@ -36,6 +35,7 @@ from openlineage.client.facet import (
     SourceCodeLocationJobFacet,
 )
 from openlineage.client.run import Job, Run, RunEvent, RunState
+from openlineage.client.uuid import generate_static_uuid
 
 from airflow.providers.openlineage import __version__ as 
OPENLINEAGE_PROVIDER_VERSION, conf
 from airflow.providers.openlineage.utils.utils import OpenLineageRedactor
@@ -43,6 +43,8 @@ from airflow.stats import Stats
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 if TYPE_CHECKING:
+    from datetime import datetime
+
     from airflow.models.dagrun import DagRun
     from airflow.providers.openlineage.extractors import OperatorLineage
     from airflow.utils.log.secrets_masker import SecretsMasker
@@ -111,15 +113,25 @@ class OpenLineageAdapter(LoggingMixin):
             return yaml.safe_load(config_file)
 
     @staticmethod
-    def build_dag_run_id(dag_id, dag_run_id):
-        return str(uuid.uuid3(uuid.NAMESPACE_URL, 
f"{conf.namespace()}.{dag_id}.{dag_run_id}"))
+    def build_dag_run_id(dag_id: str, execution_date: datetime) -> str:
+        return str(
+            generate_static_uuid(
+                instant=execution_date,
+                data=f"{conf.namespace()}.{dag_id}".encode(),
+            )
+        )
 
     @staticmethod
-    def build_task_instance_run_id(dag_id, task_id, execution_date, 
try_number):
+    def build_task_instance_run_id(
+        dag_id: str,
+        task_id: str,
+        try_number: int,
+        execution_date: datetime,
+    ):
         return str(
-            uuid.uuid3(
-                uuid.NAMESPACE_URL,
-                
f"{conf.namespace()}.{dag_id}.{task_id}.{execution_date}.{try_number}",
+            generate_static_uuid(
+                instant=execution_date,
+                
data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}".encode(),
             )
         )
 
@@ -306,7 +318,10 @@ class OpenLineageAdapter(LoggingMixin):
                 eventTime=dag_run.start_date.isoformat(),
                 job=self._build_job(job_name=dag_run.dag_id, 
job_type=_JOB_TYPE_DAG),
                 run=self._build_run(
-                    run_id=self.build_dag_run_id(dag_run.dag_id, 
dag_run.run_id),
+                    run_id=self.build_dag_run_id(
+                        dag_id=dag_run.dag_id,
+                        execution_date=dag_run.execution_date,
+                    ),
                     job_name=dag_run.dag_id,
                     nominal_start_time=nominal_start_time,
                     nominal_end_time=nominal_end_time,
@@ -328,7 +343,12 @@ class OpenLineageAdapter(LoggingMixin):
                 eventType=RunState.COMPLETE,
                 eventTime=dag_run.end_date.isoformat(),
                 job=self._build_job(job_name=dag_run.dag_id, 
job_type=_JOB_TYPE_DAG),
-                run=Run(runId=self.build_dag_run_id(dag_run.dag_id, 
dag_run.run_id)),
+                run=Run(
+                    runId=self.build_dag_run_id(
+                        dag_id=dag_run.dag_id,
+                        execution_date=dag_run.execution_date,
+                    ),
+                ),
                 inputs=[],
                 outputs=[],
                 producer=_PRODUCER,
@@ -347,7 +367,10 @@ class OpenLineageAdapter(LoggingMixin):
                 eventTime=dag_run.end_date.isoformat(),
                 job=self._build_job(job_name=dag_run.dag_id, 
job_type=_JOB_TYPE_DAG),
                 run=Run(
-                    runId=self.build_dag_run_id(dag_run.dag_id, 
dag_run.run_id),
+                    runId=self.build_dag_run_id(
+                        dag_id=dag_run.dag_id,
+                        execution_date=dag_run.execution_date,
+                    ),
                     facets={"errorMessage": ErrorMessageRunFacet(message=msg, 
programmingLanguage="python")},
                 ),
                 inputs=[],
diff --git a/airflow/providers/openlineage/plugins/listener.py 
b/airflow/providers/openlineage/plugins/listener.py
index dd2c5e8206..cd2901a721 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -111,13 +111,16 @@ class OpenLineageListener:
             # we return here because Airflow 2.3 needs task from deferred state
             if task_instance.next_method is not None:
                 return
-            parent_run_id = self.adapter.build_dag_run_id(dag.dag_id, 
dagrun.run_id)
+            parent_run_id = self.adapter.build_dag_run_id(
+                dag_id=dag.dag_id,
+                execution_date=dagrun.execution_date,
+            )
 
             task_uuid = self.adapter.build_task_instance_run_id(
                 dag_id=dag.dag_id,
                 task_id=task.task_id,
-                execution_date=task_instance.execution_date,
                 try_number=task_instance.try_number,
+                execution_date=task_instance.execution_date,
             )
             event_type = RunState.RUNNING.value.lower()
             operator_name = task.task_type.lower()
@@ -184,13 +187,16 @@ class OpenLineageListener:
 
         @print_warning(self.log)
         def on_success():
-            parent_run_id = OpenLineageAdapter.build_dag_run_id(dag.dag_id, 
dagrun.run_id)
+            parent_run_id = OpenLineageAdapter.build_dag_run_id(
+                dag_id=dag.dag_id,
+                execution_date=dagrun.execution_date,
+            )
 
             task_uuid = OpenLineageAdapter.build_task_instance_run_id(
                 dag_id=dag.dag_id,
                 task_id=task.task_id,
-                execution_date=task_instance.execution_date,
                 try_number=_get_try_number_success(task_instance),
+                execution_date=task_instance.execution_date,
             )
             event_type = RunState.COMPLETE.value.lower()
             operator_name = task.task_type.lower()
@@ -246,13 +252,16 @@ class OpenLineageListener:
 
         @print_warning(self.log)
         def on_failure():
-            parent_run_id = OpenLineageAdapter.build_dag_run_id(dag.dag_id, 
dagrun.run_id)
+            parent_run_id = OpenLineageAdapter.build_dag_run_id(
+                dag_id=dag.dag_id,
+                execution_date=dagrun.execution_date,
+            )
 
             task_uuid = OpenLineageAdapter.build_task_instance_run_id(
                 dag_id=dag.dag_id,
                 task_id=task.task_id,
-                execution_date=task_instance.execution_date,
                 try_number=task_instance.try_number,
+                execution_date=task_instance.execution_date,
             )
             event_type = RunState.FAIL.value.lower()
             operator_name = task.task_type.lower()
diff --git a/airflow/providers/openlineage/plugins/macros.py 
b/airflow/providers/openlineage/plugins/macros.py
index ddfceb3459..6bb1699aa7 100644
--- a/airflow/providers/openlineage/plugins/macros.py
+++ b/airflow/providers/openlineage/plugins/macros.py
@@ -61,8 +61,8 @@ def lineage_run_id(task_instance: TaskInstance):
     return OpenLineageAdapter.build_task_instance_run_id(
         dag_id=task_instance.dag_id,
         task_id=task_instance.task_id,
-        execution_date=task_instance.execution_date,
         try_number=task_instance.try_number,
+        execution_date=task_instance.execution_date,
     )
 
 
diff --git a/airflow/providers/openlineage/provider.yaml 
b/airflow/providers/openlineage/provider.yaml
index b3b293c0c1..de17e15c9f 100644
--- a/airflow/providers/openlineage/provider.yaml
+++ b/airflow/providers/openlineage/provider.yaml
@@ -45,8 +45,8 @@ dependencies:
   - apache-airflow>=2.7.0
   - apache-airflow-providers-common-sql>=1.6.0
   - attrs>=22.2
-  - openlineage-integration-common>=0.28.0
-  - openlineage-python>=0.28.0
+  - openlineage-integration-common>=1.15.0
+  - openlineage-python>=1.15.0
 
 integrations:
   - integration-name: OpenLineage
diff --git a/generated/provider_dependencies.json 
b/generated/provider_dependencies.json
index 0bb692425f..b577aeb7c8 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -900,8 +900,8 @@
       "apache-airflow-providers-common-sql>=1.6.0",
       "apache-airflow>=2.7.0",
       "attrs>=22.2",
-      "openlineage-integration-common>=0.28.0",
-      "openlineage-python>=0.28.0"
+      "openlineage-integration-common>=1.15.0",
+      "openlineage-python>=1.15.0"
     ],
     "devel-deps": [],
     "plugins": [
diff --git a/tests/providers/openlineage/plugins/test_adapter.py 
b/tests/providers/openlineage/plugins/test_adapter.py
index 77f6676e2d..6f010b1a2d 100644
--- a/tests/providers/openlineage/plugins/test_adapter.py
+++ b/tests/providers/openlineage/plugins/test_adapter.py
@@ -523,10 +523,10 @@ def 
test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta
     mock_stats_timer.assert_called_with("ol.emit.attempts")
 
 
[email protected]("airflow.providers.openlineage.plugins.adapter.uuid")
[email protected]("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
-def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, uuid):
+def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, 
generate_static_uuid):
     random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
     client = MagicMock()
     adapter = OpenLineageAdapter(client)
@@ -538,7 +538,7 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, uuid):
     dagrun_mock.start_date = event_time
     dagrun_mock.run_id = run_id
     dagrun_mock.dag_id = dag_id
-    uuid.uuid3.return_value = random_uuid
+    generate_static_uuid.return_value = random_uuid
 
     adapter.dag_started(
         dag_run=dagrun_mock,
@@ -582,10 +582,10 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, uuid):
     mock_stats_timer.assert_called_with("ol.emit.attempts")
 
 
[email protected]("airflow.providers.openlineage.plugins.adapter.uuid")
[email protected]("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
-def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, uuid):
+def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, 
generate_static_uuid):
     random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
     client = MagicMock()
     adapter = OpenLineageAdapter(client)
@@ -598,7 +598,7 @@ def test_emit_dag_complete_event(mock_stats_incr, 
mock_stats_timer, uuid):
     dagrun_mock.end_date = event_time
     dagrun_mock.run_id = run_id
     dagrun_mock.dag_id = dag_id
-    uuid.uuid3.return_value = random_uuid
+    generate_static_uuid.return_value = random_uuid
 
     adapter.dag_success(
         dag_run=dagrun_mock,
@@ -632,10 +632,10 @@ def test_emit_dag_complete_event(mock_stats_incr, 
mock_stats_timer, uuid):
     mock_stats_timer.assert_called_with("ol.emit.attempts")
 
 
[email protected]("airflow.providers.openlineage.plugins.adapter.uuid")
[email protected]("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
-def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer, uuid):
+def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer, 
generate_static_uuid):
     random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
     client = MagicMock()
     adapter = OpenLineageAdapter(client)
@@ -648,7 +648,7 @@ def test_emit_dag_failed_event(mock_stats_incr, 
mock_stats_timer, uuid):
     dagrun_mock.end_date = event_time
     dagrun_mock.run_id = run_id
     dagrun_mock.dag_id = dag_id
-    uuid.uuid3.return_value = random_uuid
+    generate_static_uuid.return_value = random_uuid
 
     adapter.dag_failed(
         dag_run=dagrun_mock,
@@ -707,46 +707,82 @@ def test_openlineage_adapter_stats_emit_failed(
 
 def test_build_dag_run_id_is_valid_uuid():
     dag_id = "test_dag"
-    dag_run_id = "run_1"
-    result = OpenLineageAdapter.build_dag_run_id(dag_id, dag_run_id)
-    assert uuid.UUID(result)
+    execution_date = datetime.datetime.now()
+    result = OpenLineageAdapter.build_dag_run_id(
+        dag_id=dag_id,
+        execution_date=execution_date,
+    )
+    uuid_result = uuid.UUID(result)
+    assert uuid_result
+    assert uuid_result.version == 7
 
 
-def test_build_dag_run_id_different_inputs_give_different_results():
-    result1 = OpenLineageAdapter.build_dag_run_id("dag1", "run1")
-    result2 = OpenLineageAdapter.build_dag_run_id("dag2", "run2")
-    assert result1 != result2
+def test_build_dag_run_id_same_input_give_same_result():
+    result1 = OpenLineageAdapter.build_dag_run_id(
+        dag_id="dag1",
+        execution_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
+    )
+    result2 = OpenLineageAdapter.build_dag_run_id(
+        dag_id="dag1",
+        execution_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
+    )
+    assert result1 == result2
 
 
-def test_build_dag_run_id_uses_correct_methods_underneath():
-    dag_id = "test_dag"
-    dag_run_id = "run_1"
-    expected = str(uuid.uuid3(uuid.NAMESPACE_URL, 
f"{namespace()}.{dag_id}.{dag_run_id}"))
-    actual = OpenLineageAdapter.build_dag_run_id(dag_id, dag_run_id)
-    assert actual == expected
+def test_build_dag_run_id_different_inputs_give_different_results():
+    result1 = OpenLineageAdapter.build_dag_run_id(
+        dag_id="dag1",
+        execution_date=datetime.datetime.now(),
+    )
+    result2 = OpenLineageAdapter.build_dag_run_id(
+        dag_id="dag2",
+        execution_date=datetime.datetime.now(),
+    )
+    assert result1 != result2
 
 
 def test_build_task_instance_run_id_is_valid_uuid():
-    result = OpenLineageAdapter.build_task_instance_run_id("dag_1", "task_1", 
"2023-01-01", 1)
-    assert uuid.UUID(result)
+    result = OpenLineageAdapter.build_task_instance_run_id(
+        dag_id="dag_id",
+        task_id="task_id",
+        try_number=1,
+        execution_date=datetime.datetime.now(),
+    )
+    uuid_result = uuid.UUID(result)
+    assert uuid_result
+    assert uuid_result.version == 7
 
 
-def test_build_task_instance_run_id_different_inputs_gives_different_results():
-    result1 = OpenLineageAdapter.build_task_instance_run_id("dag_1", "task1", 
"2023-01-01", 1)
-    result2 = OpenLineageAdapter.build_task_instance_run_id("dag_1", "task2", 
"2023-01-02", 2)
-    assert result1 != result2
+def test_build_task_instance_run_id_same_input_gives_same_result():
+    result1 = OpenLineageAdapter.build_task_instance_run_id(
+        dag_id="dag1",
+        task_id="task1",
+        try_number=1,
+        execution_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
+    )
+    result2 = OpenLineageAdapter.build_task_instance_run_id(
+        dag_id="dag1",
+        task_id="task1",
+        try_number=1,
+        execution_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
+    )
+    assert result1 == result2
 
 
-def test_build_task_instance_run_id_uses_correct_methods_underneath():
-    dag_id = "dag_1"
-    task_id = "task_1"
-    execution_date = "2023-01-01"
-    try_number = 1
-    expected = str(
-        uuid.uuid3(uuid.NAMESPACE_URL, 
f"{namespace()}.{dag_id}.{task_id}.{execution_date}.{try_number}")
+def test_build_task_instance_run_id_different_inputs_gives_different_results():
+    result1 = OpenLineageAdapter.build_task_instance_run_id(
+        dag_id="dag1",
+        task_id="task1",
+        try_number=1,
+        execution_date=datetime.datetime.now(),
     )
-    actual = OpenLineageAdapter.build_task_instance_run_id(dag_id, task_id, 
execution_date, try_number)
-    assert actual == expected
+    result2 = OpenLineageAdapter.build_task_instance_run_id(
+        dag_id="dag2",
+        task_id="task2",
+        try_number=2,
+        execution_date=datetime.datetime.now(),
+    )
+    assert result1 != result2
 
 
 def test_configuration_precedence_when_creating_ol_client():
diff --git a/tests/providers/openlineage/plugins/test_listener.py 
b/tests/providers/openlineage/plugins/test_listener.py
index 905a00e444..ca5708bbba 100644
--- a/tests/providers/openlineage/plugins/test_listener.py
+++ b/tests/providers/openlineage/plugins/test_listener.py
@@ -165,8 +165,11 @@ def _create_listener_and_task_instance() -> 
tuple[OpenLineageListener, TaskInsta
         # Now you can use listener and task_instance in your tests to simulate 
their interaction.
     """
 
-    def mock_task_id(dag_id, task_id, execution_date, try_number):
-        return f"{dag_id}.{task_id}.{execution_date}.{try_number}"
+    def mock_dag_id(dag_id, execution_date):
+        return f"{execution_date}.{dag_id}"
+
+    def mock_task_id(dag_id, task_id, try_number, execution_date):
+        return f"{execution_date}.{dag_id}.{task_id}.{try_number}"
 
     listener = OpenLineageListener()
     listener.log = mock.Mock()
@@ -177,7 +180,7 @@ def _create_listener_and_task_instance() -> 
tuple[OpenLineageListener, TaskInsta
     listener.extractor_manager.extract_metadata.return_value = metadata
 
     adapter = mock.Mock()
-    adapter.build_dag_run_id.side_effect = lambda x, y: f"{x}.{y}"
+    adapter.build_dag_run_id.side_effect = mock_dag_id
     adapter.build_task_instance_run_id.side_effect = mock_task_id
     adapter.start_task = mock.Mock()
     adapter.fail_task = mock.Mock()
@@ -189,6 +192,7 @@ def _create_listener_and_task_instance() -> 
tuple[OpenLineageListener, TaskInsta
     task_instance.dag_run.run_id = "dag_run_run_id"
     task_instance.dag_run.data_interval_start = None
     task_instance.dag_run.data_interval_end = None
+    task_instance.dag_run.execution_date = "execution_date"
     task_instance.task = mock.Mock()
     task_instance.task.task_id = "task_id"
     task_instance.task.dag = mock.Mock()
@@ -230,12 +234,12 @@ def 
test_adapter_start_task_is_called_with_proper_arguments(
 
     listener.on_task_instance_running(None, task_instance, None)
     listener.adapter.start_task.assert_called_once_with(
-        run_id="dag_id.task_id.execution_date.1",
+        run_id="execution_date.dag_id.task_id.1",
         job_name="job_name",
         job_description="Test DAG Description",
         event_time="2023-01-01T13:01:01",
         parent_job_name="dag_id",
-        parent_run_id="dag_id.dag_run_run_id",
+        parent_run_id="execution_date.dag_id",
         code_location=None,
         nominal_start_time=None,
         nominal_end_time=None,
@@ -260,13 +264,16 @@ def 
test_adapter_fail_task_is_called_with_proper_arguments(mock_get_job_name, mo
     failure events, thus confirming that the adapter's failure handling is 
functioning as expected.
     """
 
-    def mock_task_id(dag_id, task_id, execution_date, try_number):
-        return f"{dag_id}.{task_id}.{execution_date}.{try_number}"
+    def mock_dag_id(dag_id, execution_date):
+        return f"{execution_date}.{dag_id}"
+
+    def mock_task_id(dag_id, task_id, try_number, execution_date):
+        return f"{execution_date}.{dag_id}.{task_id}.{try_number}"
 
     listener, task_instance = _create_listener_and_task_instance()
     mock_get_job_name.return_value = "job_name"
+    mocked_adapter.build_dag_run_id.side_effect = mock_dag_id
     mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
-    mocked_adapter.build_dag_run_id.side_effect = lambda x, y: f"{x}.{y}"
     mock_disabled.return_value = False
 
     listener.on_task_instance_failed(None, task_instance, None)
@@ -274,8 +281,8 @@ def 
test_adapter_fail_task_is_called_with_proper_arguments(mock_get_job_name, mo
         end_time="2023-01-03T13:01:01",
         job_name="job_name",
         parent_job_name="dag_id",
-        parent_run_id="dag_id.dag_run_run_id",
-        run_id="dag_id.task_id.execution_date.1",
+        parent_run_id="execution_date.dag_id",
+        run_id="execution_date.dag_id.task_id.1",
         task=listener.extractor_manager.extract_metadata(),
     )
 
@@ -295,13 +302,16 @@ def 
test_adapter_complete_task_is_called_with_proper_arguments(
     during the task's lifecycle events.
     """
 
-    def mock_task_id(dag_id, task_id, execution_date, try_number):
-        return f"{dag_id}.{task_id}.{execution_date}.{try_number}"
+    def mock_dag_id(dag_id, execution_date):
+        return f"{execution_date}.{dag_id}"
+
+    def mock_task_id(dag_id, task_id, try_number, execution_date):
+        return f"{execution_date}.{dag_id}.{task_id}.{try_number}"
 
     listener, task_instance = _create_listener_and_task_instance()
     mock_get_job_name.return_value = "job_name"
+    mocked_adapter.build_dag_run_id.side_effect = mock_dag_id
     mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
-    mocked_adapter.build_dag_run_id.side_effect = lambda x, y: f"{x}.{y}"
     mock_disabled.return_value = False
 
     listener.on_task_instance_success(None, task_instance, None)
@@ -313,8 +323,8 @@ def 
test_adapter_complete_task_is_called_with_proper_arguments(
         end_time="2023-01-03T13:01:01",
         job_name="job_name",
         parent_job_name="dag_id",
-        parent_run_id="dag_id.dag_run_run_id",
-        run_id=f"dag_id.task_id.execution_date.{EXPECTED_TRY_NUMBER_1}",
+        parent_run_id="execution_date.dag_id",
+        run_id=f"execution_date.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}",
         task=listener.extractor_manager.extract_metadata(),
     )
 
@@ -328,8 +338,8 @@ def 
test_adapter_complete_task_is_called_with_proper_arguments(
         end_time="2023-01-03T13:01:01",
         job_name="job_name",
         parent_job_name="dag_id",
-        parent_run_id="dag_id.dag_run_run_id",
-        run_id=f"dag_id.task_id.execution_date.{EXPECTED_TRY_NUMBER_2}",
+        parent_run_id="execution_date.dag_id",
+        run_id=f"execution_date.dag_id.task_id.{EXPECTED_TRY_NUMBER_2}",
         task=listener.extractor_manager.extract_metadata(),
     )
 
@@ -343,15 +353,14 @@ def 
test_run_id_is_constant_across_all_methods(mocked_adapter):
     try_number attribute, as it would occur in Airflow, to verify that the 
run_id updates accordingly.
     """
 
-    def mock_task_id(dag_id, task_id, execution_date, try_number):
+    def mock_task_id(dag_id, task_id, try_number, execution_date):
         returned_try_number = try_number if AIRFLOW_V_2_10_PLUS else 
max(try_number - 1, 1)
-
-        return f"{dag_id}.{task_id}.{execution_date}.{returned_try_number}"
+        return f"{execution_date}.{dag_id}.{task_id}.{returned_try_number}"
 
     listener, task_instance = _create_listener_and_task_instance()
     mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
-    expected_run_id_1 = "dag_id.task_id.execution_date.1"
-    expected_run_id_2 = "dag_id.task_id.execution_date.2"
+    expected_run_id_1 = "execution_date.dag_id.task_id.1"
+    expected_run_id_2 = "execution_date.dag_id.task_id.2"
     listener.on_task_instance_running(None, task_instance, None)
     assert listener.adapter.start_task.call_args.kwargs["run_id"] == 
expected_run_id_1
 
diff --git a/tests/providers/openlineage/plugins/test_macros.py 
b/tests/providers/openlineage/plugins/test_macros.py
index a735312ab6..c8ae74aca4 100644
--- a/tests/providers/openlineage/plugins/test_macros.py
+++ b/tests/providers/openlineage/plugins/test_macros.py
@@ -16,7 +16,7 @@
 # under the License.
 from __future__ import annotations
 
-import uuid
+from datetime import datetime, timezone
 from unittest import mock
 
 from airflow.providers.openlineage.conf import namespace
@@ -38,8 +38,8 @@ def test_lineage_job_name():
     task_instance = mock.MagicMock(
         dag_id="dag_id",
         task_id="task_id",
-        execution_date="execution_date",
         try_number=1,
+        execution_date=datetime(2020, 1, 1, 1, 1, 1, 0, tzinfo=timezone.utc),
     )
     assert lineage_job_name(task_instance) == "dag_id.task_id"
 
@@ -48,17 +48,18 @@ def test_lineage_run_id():
     task_instance = mock.MagicMock(
         dag_id="dag_id",
         task_id="task_id",
-        execution_date="execution_date",
+        dag_run=mock.MagicMock(run_id="run_id"),
+        execution_date=datetime(2020, 1, 1, 1, 1, 1, 0, tzinfo=timezone.utc),
         try_number=1,
     )
-    actual = lineage_run_id(task_instance)
-    expected = str(
-        uuid.uuid3(
-            uuid.NAMESPACE_URL,
-            f"{_DAG_NAMESPACE}.dag_id.task_id.execution_date.1",
-        )
-    )
-    assert actual == expected
+
+    call_result1 = lineage_run_id(task_instance)
+    call_result2 = lineage_run_id(task_instance)
+
+    # random part value does not matter, it just have to be the same for the 
same TaskInstance
+    assert call_result1 == call_result2
+    # execution_date is used as most significant bits of UUID
+    assert call_result1.startswith("016f5e9e-c4c8-")
 
 
 @mock.patch("airflow.providers.openlineage.plugins.macros.lineage_run_id")
@@ -67,8 +68,8 @@ def test_lineage_parent_id(mock_run_id):
     task_instance = mock.MagicMock(
         dag_id="dag_id",
         task_id="task_id",
-        execution_date="execution_date",
         try_number=1,
+        execution_date=datetime(2020, 1, 1, 1, 1, 1, 0, tzinfo=timezone.utc),
     )
     actual = lineage_parent_id(task_instance)
     expected = f"{_DAG_NAMESPACE}/dag_id.task_id/run_id"

Reply via email to