This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e26909df6af add clear_number to OpenLineage's dagrun-level event run
id generation (#44617)
e26909df6af is described below
commit e26909df6af86cc18a272d993ad45ab17dfa333a
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Sat Dec 7 13:23:48 2024 +0100
add clear_number to OpenLineage's dagrun-level event run id generation
(#44617)
Signed-off-by: Maciej Obuchowski <[email protected]>
---
.../providers/openlineage/plugins/adapter.py | 14 +++++++-----
.../providers/openlineage/plugins/listener.py | 6 +++++
.../tests/openlineage/plugins/test_adapter.py | 26 +++++++++++++++-------
.../tests/openlineage/plugins/test_listener.py | 12 +++++-----
4 files changed, 39 insertions(+), 19 deletions(-)
diff --git a/providers/src/airflow/providers/openlineage/plugins/adapter.py
b/providers/src/airflow/providers/openlineage/plugins/adapter.py
index ec836b541a2..e6410991192 100644
--- a/providers/src/airflow/providers/openlineage/plugins/adapter.py
+++ b/providers/src/airflow/providers/openlineage/plugins/adapter.py
@@ -115,11 +115,11 @@ class OpenLineageAdapter(LoggingMixin):
return yaml.safe_load(config_file)
@staticmethod
- def build_dag_run_id(dag_id: str, logical_date: datetime) -> str:
+ def build_dag_run_id(dag_id: str, logical_date: datetime, clear_number:
int) -> str:
return str(
generate_static_uuid(
instant=logical_date,
- data=f"{conf.namespace()}.{dag_id}".encode(),
+ data=f"{conf.namespace()}.{dag_id}.{clear_number}".encode(),
)
)
@@ -333,6 +333,7 @@ class OpenLineageAdapter(LoggingMixin):
nominal_end_time: str,
owners: list[str],
run_facets: dict[str, RunFacet],
+ clear_number: int,
description: str | None = None,
job_facets: dict[str, JobFacet] | None = None, # Custom job facets
):
@@ -349,8 +350,7 @@ class OpenLineageAdapter(LoggingMixin):
),
run=self._build_run(
run_id=self.build_dag_run_id(
- dag_id=dag_id,
- logical_date=logical_date,
+ dag_id=dag_id, logical_date=logical_date,
clear_number=clear_number
),
job_name=dag_id,
nominal_start_time=nominal_start_time,
@@ -374,6 +374,7 @@ class OpenLineageAdapter(LoggingMixin):
run_id: str,
end_date: datetime,
logical_date: datetime,
+ clear_number: int,
dag_run_state: DagRunState,
task_ids: list[str],
):
@@ -384,8 +385,7 @@ class OpenLineageAdapter(LoggingMixin):
job=self._build_job(job_name=dag_id, job_type=_JOB_TYPE_DAG),
run=Run(
runId=self.build_dag_run_id(
- dag_id=dag_id,
- logical_date=logical_date,
+ dag_id=dag_id, logical_date=logical_date,
clear_number=clear_number
),
facets={
**get_airflow_state_run_facet(dag_id, run_id,
task_ids, dag_run_state),
@@ -409,6 +409,7 @@ class OpenLineageAdapter(LoggingMixin):
run_id: str,
end_date: datetime,
logical_date: datetime,
+ clear_number: int,
dag_run_state: DagRunState,
task_ids: list[str],
msg: str,
@@ -422,6 +423,7 @@ class OpenLineageAdapter(LoggingMixin):
runId=self.build_dag_run_id(
dag_id=dag_id,
logical_date=logical_date,
+ clear_number=clear_number,
),
facets={
"errorMessage": error_message_run.ErrorMessageRunFacet(
diff --git a/providers/src/airflow/providers/openlineage/plugins/listener.py
b/providers/src/airflow/providers/openlineage/plugins/listener.py
index 6a539ea27f4..7f0073d8f95 100644
--- a/providers/src/airflow/providers/openlineage/plugins/listener.py
+++ b/providers/src/airflow/providers/openlineage/plugins/listener.py
@@ -143,6 +143,7 @@ class OpenLineageListener:
parent_run_id = self.adapter.build_dag_run_id(
dag_id=dag.dag_id,
logical_date=dagrun.logical_date,
+ clear_number=dagrun.clear_number,
)
if hasattr(task_instance, "logical_date"):
@@ -228,6 +229,7 @@ class OpenLineageListener:
parent_run_id = self.adapter.build_dag_run_id(
dag_id=dag.dag_id,
logical_date=dagrun.logical_date,
+ clear_number=dagrun.clear_number,
)
if hasattr(task_instance, "logical_date"):
@@ -332,6 +334,7 @@ class OpenLineageListener:
parent_run_id = self.adapter.build_dag_run_id(
dag_id=dag.dag_id,
logical_date=dagrun.logical_date,
+ clear_number=dagrun.clear_number,
)
if hasattr(task_instance, "logical_date"):
@@ -467,6 +470,7 @@ class OpenLineageListener:
nominal_start_time=data_interval_start,
nominal_end_time=data_interval_end,
run_facets=run_facets,
+ clear_number=dag_run.clear_number,
owners=[x.strip() for x in dag_run.dag.owner.split(",")] if
dag_run.dag else None,
description=dag_run.dag.description if dag_run.dag else None,
# AirflowJobFacet should be created outside
ProcessPoolExecutor that pickles objects,
@@ -502,6 +506,7 @@ class OpenLineageListener:
run_id=dag_run.run_id,
end_date=dag_run.end_date,
logical_date=dag_run.logical_date,
+ clear_number=dag_run.clear_number,
task_ids=task_ids,
dag_run_state=dag_run.get_state(),
)
@@ -534,6 +539,7 @@ class OpenLineageListener:
run_id=dag_run.run_id,
end_date=dag_run.end_date,
logical_date=dag_run.logical_date,
+ clear_number=dag_run.clear_number,
dag_run_state=dag_run.get_state(),
task_ids=task_ids,
msg=msg,
diff --git a/providers/tests/openlineage/plugins/test_adapter.py
b/providers/tests/openlineage/plugins/test_adapter.py
index a9787a9399a..a2266bb16bc 100644
--- a/providers/tests/openlineage/plugins/test_adapter.py
+++ b/providers/tests/openlineage/plugins/test_adapter.py
@@ -594,6 +594,7 @@ def test_emit_dag_started_event(mock_stats_incr,
mock_stats_timer, generate_stat
dag_id=dag_id,
start_date=event_time,
logical_date=event_time,
+ clear_number=0,
nominal_start_time=event_time.isoformat(),
nominal_end_time=event_time.isoformat(),
owners=["airflow"],
@@ -708,6 +709,7 @@ def test_emit_dag_complete_event(
run_id=run_id,
end_date=event_time,
logical_date=event_time,
+ clear_number=0,
dag_run_state=DagRunState.SUCCESS,
task_ids=["task_0", "task_1", "task_2.test"],
)
@@ -797,6 +799,7 @@ def test_emit_dag_failed_event(
run_id=run_id,
end_date=event_time,
logical_date=event_time,
+ clear_number=0,
dag_run_state=DagRunState.FAILED,
task_ids=["task_0", "task_1", "task_2.test"],
msg="error msg",
@@ -864,6 +867,7 @@ def test_build_dag_run_id_is_valid_uuid():
result = OpenLineageAdapter.build_dag_run_id(
dag_id=dag_id,
logical_date=logical_date,
+ clear_number=0,
)
uuid_result = uuid.UUID(result)
assert uuid_result
@@ -872,24 +876,30 @@ def test_build_dag_run_id_is_valid_uuid():
def test_build_dag_run_id_same_input_give_same_result():
result1 = OpenLineageAdapter.build_dag_run_id(
- dag_id="dag1",
- logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
+ dag_id="dag1", logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
clear_number=0
)
result2 = OpenLineageAdapter.build_dag_run_id(
- dag_id="dag1",
- logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
+ dag_id="dag1", logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
clear_number=0
)
assert result1 == result2
def test_build_dag_run_id_different_inputs_give_different_results():
result1 = OpenLineageAdapter.build_dag_run_id(
- dag_id="dag1",
- logical_date=datetime.datetime.now(),
+ dag_id="dag1", logical_date=datetime.datetime.now(), clear_number=0
)
result2 = OpenLineageAdapter.build_dag_run_id(
- dag_id="dag2",
- logical_date=datetime.datetime.now(),
+ dag_id="dag2", logical_date=datetime.datetime.now(), clear_number=0
+ )
+ assert result1 != result2
+
+
+def test_build_dag_run_id_different_clear_number_give_different_results():
+ result1 = OpenLineageAdapter.build_dag_run_id(
+ dag_id="dag1", logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
clear_number=0
+ )
+ result2 = OpenLineageAdapter.build_dag_run_id(
+ dag_id="dag1", logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
clear_number=1
)
assert result1 != result2
diff --git a/providers/tests/openlineage/plugins/test_listener.py
b/providers/tests/openlineage/plugins/test_listener.py
index cbc4436ef01..ccb330faa5d 100644
--- a/providers/tests/openlineage/plugins/test_listener.py
+++ b/providers/tests/openlineage/plugins/test_listener.py
@@ -188,8 +188,8 @@ def _create_listener_and_task_instance() ->
tuple[OpenLineageListener, TaskInsta
# Now you can use listener and task_instance in your tests to simulate
their interaction.
"""
- def mock_dag_id(dag_id, logical_date):
- return f"{logical_date.isoformat()}.{dag_id}"
+ def mock_dag_id(dag_id, logical_date, clear_number):
+ return f"{logical_date.isoformat()}.{dag_id}.{clear_number}"
def mock_task_id(dag_id, task_id, try_number, logical_date, map_index):
return
f"{logical_date.isoformat()}.{dag_id}.{task_id}.{try_number}.{map_index}"
@@ -214,6 +214,7 @@ def _create_listener_and_task_instance() ->
tuple[OpenLineageListener, TaskInsta
task_instance.dag_run.run_id = "dag_run_run_id"
task_instance.dag_run.data_interval_start = None
task_instance.dag_run.data_interval_end = None
+ task_instance.dag_run.clear_number = 0
if AIRFLOW_V_3_0_PLUS:
task_instance.dag_run.logical_date = dt.datetime(2020, 1, 1, 1, 1, 1)
else:
@@ -276,7 +277,7 @@ def test_adapter_start_task_is_called_with_proper_arguments(
job_description="Test DAG Description",
event_time="2023-01-01T13:01:01",
parent_job_name="dag_id",
- parent_run_id="2020-01-01T01:01:01.dag_id",
+ parent_run_id="2020-01-01T01:01:01.dag_id.0",
code_location=None,
nominal_start_time=None,
nominal_end_time=None,
@@ -330,7 +331,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments(
end_time="2023-01-03T13:01:01",
job_name="job_name",
parent_job_name="dag_id",
- parent_run_id="2020-01-01T01:01:01.dag_id",
+ parent_run_id="2020-01-01T01:01:01.dag_id.0",
run_id="2020-01-01T01:01:01.dag_id.task_id.1.-1",
task=listener.extractor_manager.extract_metadata(),
run_facets={
@@ -379,7 +380,7 @@ def
test_adapter_complete_task_is_called_with_proper_arguments(
end_time="2023-01-03T13:01:01",
job_name="job_name",
parent_job_name="dag_id",
- parent_run_id="2020-01-01T01:01:01.dag_id",
+ parent_run_id="2020-01-01T01:01:01.dag_id.0",
run_id=f"2020-01-01T01:01:01.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}.-1",
task=listener.extractor_manager.extract_metadata(),
run_facets={
@@ -653,6 +654,7 @@ def test_listener_logs_failed_serialization():
run_id="",
end_date=event_time,
logical_date=callback_future,
+ clear_number=0,
dag_run_state=DagRunState.FAILED,
task_ids=["task_id"],
msg="",