This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 22305477bb Use UUIDv7 for OpenLineage runIds (#39889)
22305477bb is described below
commit 22305477bb056cb7a77af59f4dc906ff8a20583d
Author: Maxim Martynov <[email protected]>
AuthorDate: Tue May 28 14:09:05 2024 +0300
Use UUIDv7 for OpenLineage runIds (#39889)
---
airflow/providers/openlineage/plugins/adapter.py | 43 ++++++--
airflow/providers/openlineage/plugins/listener.py | 21 ++--
airflow/providers/openlineage/plugins/macros.py | 2 +-
airflow/providers/openlineage/provider.yaml | 4 +-
generated/provider_dependencies.json | 4 +-
.../providers/openlineage/plugins/test_adapter.py | 110 ++++++++++++++-------
.../providers/openlineage/plugins/test_listener.py | 53 +++++-----
tests/providers/openlineage/plugins/test_macros.py | 25 ++---
8 files changed, 170 insertions(+), 92 deletions(-)
diff --git a/airflow/providers/openlineage/plugins/adapter.py
b/airflow/providers/openlineage/plugins/adapter.py
index e449668ef3..5a5b8ed34b 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -17,7 +17,6 @@
from __future__ import annotations
import traceback
-import uuid
from contextlib import ExitStack
from typing import TYPE_CHECKING
@@ -36,6 +35,7 @@ from openlineage.client.facet import (
SourceCodeLocationJobFacet,
)
from openlineage.client.run import Job, Run, RunEvent, RunState
+from openlineage.client.uuid import generate_static_uuid
from airflow.providers.openlineage import __version__ as
OPENLINEAGE_PROVIDER_VERSION, conf
from airflow.providers.openlineage.utils.utils import OpenLineageRedactor
@@ -43,6 +43,8 @@ from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin
if TYPE_CHECKING:
+ from datetime import datetime
+
from airflow.models.dagrun import DagRun
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.utils.log.secrets_masker import SecretsMasker
@@ -111,15 +113,25 @@ class OpenLineageAdapter(LoggingMixin):
return yaml.safe_load(config_file)
@staticmethod
- def build_dag_run_id(dag_id, dag_run_id):
- return str(uuid.uuid3(uuid.NAMESPACE_URL,
f"{conf.namespace()}.{dag_id}.{dag_run_id}"))
+ def build_dag_run_id(dag_id: str, execution_date: datetime) -> str:
+ return str(
+ generate_static_uuid(
+ instant=execution_date,
+ data=f"{conf.namespace()}.{dag_id}".encode(),
+ )
+ )
@staticmethod
- def build_task_instance_run_id(dag_id, task_id, execution_date,
try_number):
+ def build_task_instance_run_id(
+ dag_id: str,
+ task_id: str,
+ try_number: int,
+ execution_date: datetime,
+ ):
return str(
- uuid.uuid3(
- uuid.NAMESPACE_URL,
-
f"{conf.namespace()}.{dag_id}.{task_id}.{execution_date}.{try_number}",
+ generate_static_uuid(
+ instant=execution_date,
+
data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}".encode(),
)
)
@@ -306,7 +318,10 @@ class OpenLineageAdapter(LoggingMixin):
eventTime=dag_run.start_date.isoformat(),
job=self._build_job(job_name=dag_run.dag_id,
job_type=_JOB_TYPE_DAG),
run=self._build_run(
- run_id=self.build_dag_run_id(dag_run.dag_id,
dag_run.run_id),
+ run_id=self.build_dag_run_id(
+ dag_id=dag_run.dag_id,
+ execution_date=dag_run.execution_date,
+ ),
job_name=dag_run.dag_id,
nominal_start_time=nominal_start_time,
nominal_end_time=nominal_end_time,
@@ -328,7 +343,12 @@ class OpenLineageAdapter(LoggingMixin):
eventType=RunState.COMPLETE,
eventTime=dag_run.end_date.isoformat(),
job=self._build_job(job_name=dag_run.dag_id,
job_type=_JOB_TYPE_DAG),
- run=Run(runId=self.build_dag_run_id(dag_run.dag_id,
dag_run.run_id)),
+ run=Run(
+ runId=self.build_dag_run_id(
+ dag_id=dag_run.dag_id,
+ execution_date=dag_run.execution_date,
+ ),
+ ),
inputs=[],
outputs=[],
producer=_PRODUCER,
@@ -347,7 +367,10 @@ class OpenLineageAdapter(LoggingMixin):
eventTime=dag_run.end_date.isoformat(),
job=self._build_job(job_name=dag_run.dag_id,
job_type=_JOB_TYPE_DAG),
run=Run(
- runId=self.build_dag_run_id(dag_run.dag_id,
dag_run.run_id),
+ runId=self.build_dag_run_id(
+ dag_id=dag_run.dag_id,
+ execution_date=dag_run.execution_date,
+ ),
facets={"errorMessage": ErrorMessageRunFacet(message=msg,
programmingLanguage="python")},
),
inputs=[],
diff --git a/airflow/providers/openlineage/plugins/listener.py
b/airflow/providers/openlineage/plugins/listener.py
index dd2c5e8206..cd2901a721 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -111,13 +111,16 @@ class OpenLineageListener:
# we return here because Airflow 2.3 needs task from deferred state
if task_instance.next_method is not None:
return
- parent_run_id = self.adapter.build_dag_run_id(dag.dag_id,
dagrun.run_id)
+ parent_run_id = self.adapter.build_dag_run_id(
+ dag_id=dag.dag_id,
+ execution_date=dagrun.execution_date,
+ )
task_uuid = self.adapter.build_task_instance_run_id(
dag_id=dag.dag_id,
task_id=task.task_id,
- execution_date=task_instance.execution_date,
try_number=task_instance.try_number,
+ execution_date=task_instance.execution_date,
)
event_type = RunState.RUNNING.value.lower()
operator_name = task.task_type.lower()
@@ -184,13 +187,16 @@ class OpenLineageListener:
@print_warning(self.log)
def on_success():
- parent_run_id = OpenLineageAdapter.build_dag_run_id(dag.dag_id,
dagrun.run_id)
+ parent_run_id = OpenLineageAdapter.build_dag_run_id(
+ dag_id=dag.dag_id,
+ execution_date=dagrun.execution_date,
+ )
task_uuid = OpenLineageAdapter.build_task_instance_run_id(
dag_id=dag.dag_id,
task_id=task.task_id,
- execution_date=task_instance.execution_date,
try_number=_get_try_number_success(task_instance),
+ execution_date=task_instance.execution_date,
)
event_type = RunState.COMPLETE.value.lower()
operator_name = task.task_type.lower()
@@ -246,13 +252,16 @@ class OpenLineageListener:
@print_warning(self.log)
def on_failure():
- parent_run_id = OpenLineageAdapter.build_dag_run_id(dag.dag_id,
dagrun.run_id)
+ parent_run_id = OpenLineageAdapter.build_dag_run_id(
+ dag_id=dag.dag_id,
+ execution_date=dagrun.execution_date,
+ )
task_uuid = OpenLineageAdapter.build_task_instance_run_id(
dag_id=dag.dag_id,
task_id=task.task_id,
- execution_date=task_instance.execution_date,
try_number=task_instance.try_number,
+ execution_date=task_instance.execution_date,
)
event_type = RunState.FAIL.value.lower()
operator_name = task.task_type.lower()
diff --git a/airflow/providers/openlineage/plugins/macros.py
b/airflow/providers/openlineage/plugins/macros.py
index ddfceb3459..6bb1699aa7 100644
--- a/airflow/providers/openlineage/plugins/macros.py
+++ b/airflow/providers/openlineage/plugins/macros.py
@@ -61,8 +61,8 @@ def lineage_run_id(task_instance: TaskInstance):
return OpenLineageAdapter.build_task_instance_run_id(
dag_id=task_instance.dag_id,
task_id=task_instance.task_id,
- execution_date=task_instance.execution_date,
try_number=task_instance.try_number,
+ execution_date=task_instance.execution_date,
)
diff --git a/airflow/providers/openlineage/provider.yaml
b/airflow/providers/openlineage/provider.yaml
index b3b293c0c1..de17e15c9f 100644
--- a/airflow/providers/openlineage/provider.yaml
+++ b/airflow/providers/openlineage/provider.yaml
@@ -45,8 +45,8 @@ dependencies:
- apache-airflow>=2.7.0
- apache-airflow-providers-common-sql>=1.6.0
- attrs>=22.2
- - openlineage-integration-common>=0.28.0
- - openlineage-python>=0.28.0
+ - openlineage-integration-common>=1.15.0
+ - openlineage-python>=1.15.0
integrations:
- integration-name: OpenLineage
diff --git a/generated/provider_dependencies.json
b/generated/provider_dependencies.json
index 0bb692425f..b577aeb7c8 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -900,8 +900,8 @@
"apache-airflow-providers-common-sql>=1.6.0",
"apache-airflow>=2.7.0",
"attrs>=22.2",
- "openlineage-integration-common>=0.28.0",
- "openlineage-python>=0.28.0"
+ "openlineage-integration-common>=1.15.0",
+ "openlineage-python>=1.15.0"
],
"devel-deps": [],
"plugins": [
diff --git a/tests/providers/openlineage/plugins/test_adapter.py
b/tests/providers/openlineage/plugins/test_adapter.py
index 77f6676e2d..6f010b1a2d 100644
--- a/tests/providers/openlineage/plugins/test_adapter.py
+++ b/tests/providers/openlineage/plugins/test_adapter.py
@@ -523,10 +523,10 @@ def
test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta
mock_stats_timer.assert_called_with("ol.emit.attempts")
[email protected]("airflow.providers.openlineage.plugins.adapter.uuid")
[email protected]("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
-def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, uuid):
+def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer,
generate_static_uuid):
random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
client = MagicMock()
adapter = OpenLineageAdapter(client)
@@ -538,7 +538,7 @@ def test_emit_dag_started_event(mock_stats_incr,
mock_stats_timer, uuid):
dagrun_mock.start_date = event_time
dagrun_mock.run_id = run_id
dagrun_mock.dag_id = dag_id
- uuid.uuid3.return_value = random_uuid
+ generate_static_uuid.return_value = random_uuid
adapter.dag_started(
dag_run=dagrun_mock,
@@ -582,10 +582,10 @@ def test_emit_dag_started_event(mock_stats_incr,
mock_stats_timer, uuid):
mock_stats_timer.assert_called_with("ol.emit.attempts")
[email protected]("airflow.providers.openlineage.plugins.adapter.uuid")
[email protected]("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
-def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, uuid):
+def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer,
generate_static_uuid):
random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
client = MagicMock()
adapter = OpenLineageAdapter(client)
@@ -598,7 +598,7 @@ def test_emit_dag_complete_event(mock_stats_incr,
mock_stats_timer, uuid):
dagrun_mock.end_date = event_time
dagrun_mock.run_id = run_id
dagrun_mock.dag_id = dag_id
- uuid.uuid3.return_value = random_uuid
+ generate_static_uuid.return_value = random_uuid
adapter.dag_success(
dag_run=dagrun_mock,
@@ -632,10 +632,10 @@ def test_emit_dag_complete_event(mock_stats_incr,
mock_stats_timer, uuid):
mock_stats_timer.assert_called_with("ol.emit.attempts")
[email protected]("airflow.providers.openlineage.plugins.adapter.uuid")
[email protected]("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
-def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer, uuid):
+def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer,
generate_static_uuid):
random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
client = MagicMock()
adapter = OpenLineageAdapter(client)
@@ -648,7 +648,7 @@ def test_emit_dag_failed_event(mock_stats_incr,
mock_stats_timer, uuid):
dagrun_mock.end_date = event_time
dagrun_mock.run_id = run_id
dagrun_mock.dag_id = dag_id
- uuid.uuid3.return_value = random_uuid
+ generate_static_uuid.return_value = random_uuid
adapter.dag_failed(
dag_run=dagrun_mock,
@@ -707,46 +707,82 @@ def test_openlineage_adapter_stats_emit_failed(
def test_build_dag_run_id_is_valid_uuid():
dag_id = "test_dag"
- dag_run_id = "run_1"
- result = OpenLineageAdapter.build_dag_run_id(dag_id, dag_run_id)
- assert uuid.UUID(result)
+ execution_date = datetime.datetime.now()
+ result = OpenLineageAdapter.build_dag_run_id(
+ dag_id=dag_id,
+ execution_date=execution_date,
+ )
+ uuid_result = uuid.UUID(result)
+ assert uuid_result
+ assert uuid_result.version == 7
-def test_build_dag_run_id_different_inputs_give_different_results():
- result1 = OpenLineageAdapter.build_dag_run_id("dag1", "run1")
- result2 = OpenLineageAdapter.build_dag_run_id("dag2", "run2")
- assert result1 != result2
+def test_build_dag_run_id_same_input_give_same_result():
+ result1 = OpenLineageAdapter.build_dag_run_id(
+ dag_id="dag1",
+ execution_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
+ )
+ result2 = OpenLineageAdapter.build_dag_run_id(
+ dag_id="dag1",
+ execution_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
+ )
+ assert result1 == result2
-def test_build_dag_run_id_uses_correct_methods_underneath():
- dag_id = "test_dag"
- dag_run_id = "run_1"
- expected = str(uuid.uuid3(uuid.NAMESPACE_URL,
f"{namespace()}.{dag_id}.{dag_run_id}"))
- actual = OpenLineageAdapter.build_dag_run_id(dag_id, dag_run_id)
- assert actual == expected
+def test_build_dag_run_id_different_inputs_give_different_results():
+ result1 = OpenLineageAdapter.build_dag_run_id(
+ dag_id="dag1",
+ execution_date=datetime.datetime.now(),
+ )
+ result2 = OpenLineageAdapter.build_dag_run_id(
+ dag_id="dag2",
+ execution_date=datetime.datetime.now(),
+ )
+ assert result1 != result2
def test_build_task_instance_run_id_is_valid_uuid():
- result = OpenLineageAdapter.build_task_instance_run_id("dag_1", "task_1",
"2023-01-01", 1)
- assert uuid.UUID(result)
+ result = OpenLineageAdapter.build_task_instance_run_id(
+ dag_id="dag_id",
+ task_id="task_id",
+ try_number=1,
+ execution_date=datetime.datetime.now(),
+ )
+ uuid_result = uuid.UUID(result)
+ assert uuid_result
+ assert uuid_result.version == 7
-def test_build_task_instance_run_id_different_inputs_gives_different_results():
- result1 = OpenLineageAdapter.build_task_instance_run_id("dag_1", "task1",
"2023-01-01", 1)
- result2 = OpenLineageAdapter.build_task_instance_run_id("dag_1", "task2",
"2023-01-02", 2)
- assert result1 != result2
+def test_build_task_instance_run_id_same_input_gives_same_result():
+ result1 = OpenLineageAdapter.build_task_instance_run_id(
+ dag_id="dag1",
+ task_id="task1",
+ try_number=1,
+ execution_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
+ )
+ result2 = OpenLineageAdapter.build_task_instance_run_id(
+ dag_id="dag1",
+ task_id="task1",
+ try_number=1,
+ execution_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
+ )
+ assert result1 == result2
-def test_build_task_instance_run_id_uses_correct_methods_underneath():
- dag_id = "dag_1"
- task_id = "task_1"
- execution_date = "2023-01-01"
- try_number = 1
- expected = str(
- uuid.uuid3(uuid.NAMESPACE_URL,
f"{namespace()}.{dag_id}.{task_id}.{execution_date}.{try_number}")
+def test_build_task_instance_run_id_different_inputs_gives_different_results():
+ result1 = OpenLineageAdapter.build_task_instance_run_id(
+ dag_id="dag1",
+ task_id="task1",
+ try_number=1,
+ execution_date=datetime.datetime.now(),
)
- actual = OpenLineageAdapter.build_task_instance_run_id(dag_id, task_id,
execution_date, try_number)
- assert actual == expected
+ result2 = OpenLineageAdapter.build_task_instance_run_id(
+ dag_id="dag2",
+ task_id="task2",
+ try_number=2,
+ execution_date=datetime.datetime.now(),
+ )
+ assert result1 != result2
def test_configuration_precedence_when_creating_ol_client():
diff --git a/tests/providers/openlineage/plugins/test_listener.py
b/tests/providers/openlineage/plugins/test_listener.py
index 905a00e444..ca5708bbba 100644
--- a/tests/providers/openlineage/plugins/test_listener.py
+++ b/tests/providers/openlineage/plugins/test_listener.py
@@ -165,8 +165,11 @@ def _create_listener_and_task_instance() ->
tuple[OpenLineageListener, TaskInsta
# Now you can use listener and task_instance in your tests to simulate
their interaction.
"""
- def mock_task_id(dag_id, task_id, execution_date, try_number):
- return f"{dag_id}.{task_id}.{execution_date}.{try_number}"
+ def mock_dag_id(dag_id, execution_date):
+ return f"{execution_date}.{dag_id}"
+
+ def mock_task_id(dag_id, task_id, try_number, execution_date):
+ return f"{execution_date}.{dag_id}.{task_id}.{try_number}"
listener = OpenLineageListener()
listener.log = mock.Mock()
@@ -177,7 +180,7 @@ def _create_listener_and_task_instance() ->
tuple[OpenLineageListener, TaskInsta
listener.extractor_manager.extract_metadata.return_value = metadata
adapter = mock.Mock()
- adapter.build_dag_run_id.side_effect = lambda x, y: f"{x}.{y}"
+ adapter.build_dag_run_id.side_effect = mock_dag_id
adapter.build_task_instance_run_id.side_effect = mock_task_id
adapter.start_task = mock.Mock()
adapter.fail_task = mock.Mock()
@@ -189,6 +192,7 @@ def _create_listener_and_task_instance() ->
tuple[OpenLineageListener, TaskInsta
task_instance.dag_run.run_id = "dag_run_run_id"
task_instance.dag_run.data_interval_start = None
task_instance.dag_run.data_interval_end = None
+ task_instance.dag_run.execution_date = "execution_date"
task_instance.task = mock.Mock()
task_instance.task.task_id = "task_id"
task_instance.task.dag = mock.Mock()
@@ -230,12 +234,12 @@ def
test_adapter_start_task_is_called_with_proper_arguments(
listener.on_task_instance_running(None, task_instance, None)
listener.adapter.start_task.assert_called_once_with(
- run_id="dag_id.task_id.execution_date.1",
+ run_id="execution_date.dag_id.task_id.1",
job_name="job_name",
job_description="Test DAG Description",
event_time="2023-01-01T13:01:01",
parent_job_name="dag_id",
- parent_run_id="dag_id.dag_run_run_id",
+ parent_run_id="execution_date.dag_id",
code_location=None,
nominal_start_time=None,
nominal_end_time=None,
@@ -260,13 +264,16 @@ def
test_adapter_fail_task_is_called_with_proper_arguments(mock_get_job_name, mo
failure events, thus confirming that the adapter's failure handling is
functioning as expected.
"""
- def mock_task_id(dag_id, task_id, execution_date, try_number):
- return f"{dag_id}.{task_id}.{execution_date}.{try_number}"
+ def mock_dag_id(dag_id, execution_date):
+ return f"{execution_date}.{dag_id}"
+
+ def mock_task_id(dag_id, task_id, try_number, execution_date):
+ return f"{execution_date}.{dag_id}.{task_id}.{try_number}"
listener, task_instance = _create_listener_and_task_instance()
mock_get_job_name.return_value = "job_name"
+ mocked_adapter.build_dag_run_id.side_effect = mock_dag_id
mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
- mocked_adapter.build_dag_run_id.side_effect = lambda x, y: f"{x}.{y}"
mock_disabled.return_value = False
listener.on_task_instance_failed(None, task_instance, None)
@@ -274,8 +281,8 @@ def
test_adapter_fail_task_is_called_with_proper_arguments(mock_get_job_name, mo
end_time="2023-01-03T13:01:01",
job_name="job_name",
parent_job_name="dag_id",
- parent_run_id="dag_id.dag_run_run_id",
- run_id="dag_id.task_id.execution_date.1",
+ parent_run_id="execution_date.dag_id",
+ run_id="execution_date.dag_id.task_id.1",
task=listener.extractor_manager.extract_metadata(),
)
@@ -295,13 +302,16 @@ def
test_adapter_complete_task_is_called_with_proper_arguments(
during the task's lifecycle events.
"""
- def mock_task_id(dag_id, task_id, execution_date, try_number):
- return f"{dag_id}.{task_id}.{execution_date}.{try_number}"
+ def mock_dag_id(dag_id, execution_date):
+ return f"{execution_date}.{dag_id}"
+
+ def mock_task_id(dag_id, task_id, try_number, execution_date):
+ return f"{execution_date}.{dag_id}.{task_id}.{try_number}"
listener, task_instance = _create_listener_and_task_instance()
mock_get_job_name.return_value = "job_name"
+ mocked_adapter.build_dag_run_id.side_effect = mock_dag_id
mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
- mocked_adapter.build_dag_run_id.side_effect = lambda x, y: f"{x}.{y}"
mock_disabled.return_value = False
listener.on_task_instance_success(None, task_instance, None)
@@ -313,8 +323,8 @@ def
test_adapter_complete_task_is_called_with_proper_arguments(
end_time="2023-01-03T13:01:01",
job_name="job_name",
parent_job_name="dag_id",
- parent_run_id="dag_id.dag_run_run_id",
- run_id=f"dag_id.task_id.execution_date.{EXPECTED_TRY_NUMBER_1}",
+ parent_run_id="execution_date.dag_id",
+ run_id=f"execution_date.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}",
task=listener.extractor_manager.extract_metadata(),
)
@@ -328,8 +338,8 @@ def
test_adapter_complete_task_is_called_with_proper_arguments(
end_time="2023-01-03T13:01:01",
job_name="job_name",
parent_job_name="dag_id",
- parent_run_id="dag_id.dag_run_run_id",
- run_id=f"dag_id.task_id.execution_date.{EXPECTED_TRY_NUMBER_2}",
+ parent_run_id="execution_date.dag_id",
+ run_id=f"execution_date.dag_id.task_id.{EXPECTED_TRY_NUMBER_2}",
task=listener.extractor_manager.extract_metadata(),
)
@@ -343,15 +353,14 @@ def
test_run_id_is_constant_across_all_methods(mocked_adapter):
try_number attribute, as it would occur in Airflow, to verify that the
run_id updates accordingly.
"""
- def mock_task_id(dag_id, task_id, execution_date, try_number):
+ def mock_task_id(dag_id, task_id, try_number, execution_date):
returned_try_number = try_number if AIRFLOW_V_2_10_PLUS else
max(try_number - 1, 1)
-
- return f"{dag_id}.{task_id}.{execution_date}.{returned_try_number}"
+ return f"{execution_date}.{dag_id}.{task_id}.{returned_try_number}"
listener, task_instance = _create_listener_and_task_instance()
mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
- expected_run_id_1 = "dag_id.task_id.execution_date.1"
- expected_run_id_2 = "dag_id.task_id.execution_date.2"
+ expected_run_id_1 = "execution_date.dag_id.task_id.1"
+ expected_run_id_2 = "execution_date.dag_id.task_id.2"
listener.on_task_instance_running(None, task_instance, None)
assert listener.adapter.start_task.call_args.kwargs["run_id"] ==
expected_run_id_1
diff --git a/tests/providers/openlineage/plugins/test_macros.py
b/tests/providers/openlineage/plugins/test_macros.py
index a735312ab6..c8ae74aca4 100644
--- a/tests/providers/openlineage/plugins/test_macros.py
+++ b/tests/providers/openlineage/plugins/test_macros.py
@@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations
-import uuid
+from datetime import datetime, timezone
from unittest import mock
from airflow.providers.openlineage.conf import namespace
@@ -38,8 +38,8 @@ def test_lineage_job_name():
task_instance = mock.MagicMock(
dag_id="dag_id",
task_id="task_id",
- execution_date="execution_date",
try_number=1,
+ execution_date=datetime(2020, 1, 1, 1, 1, 1, 0, tzinfo=timezone.utc),
)
assert lineage_job_name(task_instance) == "dag_id.task_id"
@@ -48,17 +48,18 @@ def test_lineage_run_id():
task_instance = mock.MagicMock(
dag_id="dag_id",
task_id="task_id",
- execution_date="execution_date",
+ dag_run=mock.MagicMock(run_id="run_id"),
+ execution_date=datetime(2020, 1, 1, 1, 1, 1, 0, tzinfo=timezone.utc),
try_number=1,
)
- actual = lineage_run_id(task_instance)
- expected = str(
- uuid.uuid3(
- uuid.NAMESPACE_URL,
- f"{_DAG_NAMESPACE}.dag_id.task_id.execution_date.1",
- )
- )
- assert actual == expected
+
+ call_result1 = lineage_run_id(task_instance)
+ call_result2 = lineage_run_id(task_instance)
+
+ # random part value does not matter, it just have to be the same for the
same TaskInstance
+ assert call_result1 == call_result2
+ # execution_date is used as most significant bits of UUID
+ assert call_result1.startswith("016f5e9e-c4c8-")
@mock.patch("airflow.providers.openlineage.plugins.macros.lineage_run_id")
@@ -67,8 +68,8 @@ def test_lineage_parent_id(mock_run_id):
task_instance = mock.MagicMock(
dag_id="dag_id",
task_id="task_id",
- execution_date="execution_date",
try_number=1,
+ execution_date=datetime(2020, 1, 1, 1, 1, 1, 0, tzinfo=timezone.utc),
)
actual = lineage_parent_id(task_instance)
expected = f"{_DAG_NAMESPACE}/dag_id.task_id/run_id"