This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e26909df6af add clear_number to OpenLineage's dagrun-level event run 
id generation (#44617)
e26909df6af is described below

commit e26909df6af86cc18a272d993ad45ab17dfa333a
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Sat Dec 7 13:23:48 2024 +0100

    add clear_number to OpenLineage's dagrun-level event run id generation 
(#44617)
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
---
 .../providers/openlineage/plugins/adapter.py       | 14 +++++++-----
 .../providers/openlineage/plugins/listener.py      |  6 +++++
 .../tests/openlineage/plugins/test_adapter.py      | 26 +++++++++++++++-------
 .../tests/openlineage/plugins/test_listener.py     | 12 +++++-----
 4 files changed, 39 insertions(+), 19 deletions(-)

diff --git a/providers/src/airflow/providers/openlineage/plugins/adapter.py 
b/providers/src/airflow/providers/openlineage/plugins/adapter.py
index ec836b541a2..e6410991192 100644
--- a/providers/src/airflow/providers/openlineage/plugins/adapter.py
+++ b/providers/src/airflow/providers/openlineage/plugins/adapter.py
@@ -115,11 +115,11 @@ class OpenLineageAdapter(LoggingMixin):
             return yaml.safe_load(config_file)
 
     @staticmethod
-    def build_dag_run_id(dag_id: str, logical_date: datetime) -> str:
+    def build_dag_run_id(dag_id: str, logical_date: datetime, clear_number: 
int) -> str:
         return str(
             generate_static_uuid(
                 instant=logical_date,
-                data=f"{conf.namespace()}.{dag_id}".encode(),
+                data=f"{conf.namespace()}.{dag_id}.{clear_number}".encode(),
             )
         )
 
@@ -333,6 +333,7 @@ class OpenLineageAdapter(LoggingMixin):
         nominal_end_time: str,
         owners: list[str],
         run_facets: dict[str, RunFacet],
+        clear_number: int,
         description: str | None = None,
         job_facets: dict[str, JobFacet] | None = None,  # Custom job facets
     ):
@@ -349,8 +350,7 @@ class OpenLineageAdapter(LoggingMixin):
                 ),
                 run=self._build_run(
                     run_id=self.build_dag_run_id(
-                        dag_id=dag_id,
-                        logical_date=logical_date,
+                        dag_id=dag_id, logical_date=logical_date, 
clear_number=clear_number
                     ),
                     job_name=dag_id,
                     nominal_start_time=nominal_start_time,
@@ -374,6 +374,7 @@ class OpenLineageAdapter(LoggingMixin):
         run_id: str,
         end_date: datetime,
         logical_date: datetime,
+        clear_number: int,
         dag_run_state: DagRunState,
         task_ids: list[str],
     ):
@@ -384,8 +385,7 @@ class OpenLineageAdapter(LoggingMixin):
                 job=self._build_job(job_name=dag_id, job_type=_JOB_TYPE_DAG),
                 run=Run(
                     runId=self.build_dag_run_id(
-                        dag_id=dag_id,
-                        logical_date=logical_date,
+                        dag_id=dag_id, logical_date=logical_date, 
clear_number=clear_number
                     ),
                     facets={
                         **get_airflow_state_run_facet(dag_id, run_id, 
task_ids, dag_run_state),
@@ -409,6 +409,7 @@ class OpenLineageAdapter(LoggingMixin):
         run_id: str,
         end_date: datetime,
         logical_date: datetime,
+        clear_number: int,
         dag_run_state: DagRunState,
         task_ids: list[str],
         msg: str,
@@ -422,6 +423,7 @@ class OpenLineageAdapter(LoggingMixin):
                     runId=self.build_dag_run_id(
                         dag_id=dag_id,
                         logical_date=logical_date,
+                        clear_number=clear_number,
                     ),
                     facets={
                         "errorMessage": error_message_run.ErrorMessageRunFacet(
diff --git a/providers/src/airflow/providers/openlineage/plugins/listener.py 
b/providers/src/airflow/providers/openlineage/plugins/listener.py
index 6a539ea27f4..7f0073d8f95 100644
--- a/providers/src/airflow/providers/openlineage/plugins/listener.py
+++ b/providers/src/airflow/providers/openlineage/plugins/listener.py
@@ -143,6 +143,7 @@ class OpenLineageListener:
             parent_run_id = self.adapter.build_dag_run_id(
                 dag_id=dag.dag_id,
                 logical_date=dagrun.logical_date,
+                clear_number=dagrun.clear_number,
             )
 
             if hasattr(task_instance, "logical_date"):
@@ -228,6 +229,7 @@ class OpenLineageListener:
             parent_run_id = self.adapter.build_dag_run_id(
                 dag_id=dag.dag_id,
                 logical_date=dagrun.logical_date,
+                clear_number=dagrun.clear_number,
             )
 
             if hasattr(task_instance, "logical_date"):
@@ -332,6 +334,7 @@ class OpenLineageListener:
             parent_run_id = self.adapter.build_dag_run_id(
                 dag_id=dag.dag_id,
                 logical_date=dagrun.logical_date,
+                clear_number=dagrun.clear_number,
             )
 
             if hasattr(task_instance, "logical_date"):
@@ -467,6 +470,7 @@ class OpenLineageListener:
                 nominal_start_time=data_interval_start,
                 nominal_end_time=data_interval_end,
                 run_facets=run_facets,
+                clear_number=dag_run.clear_number,
                 owners=[x.strip() for x in dag_run.dag.owner.split(",")] if 
dag_run.dag else None,
                 description=dag_run.dag.description if dag_run.dag else None,
                 # AirflowJobFacet should be created outside 
ProcessPoolExecutor that pickles objects,
@@ -502,6 +506,7 @@ class OpenLineageListener:
                 run_id=dag_run.run_id,
                 end_date=dag_run.end_date,
                 logical_date=dag_run.logical_date,
+                clear_number=dag_run.clear_number,
                 task_ids=task_ids,
                 dag_run_state=dag_run.get_state(),
             )
@@ -534,6 +539,7 @@ class OpenLineageListener:
                 run_id=dag_run.run_id,
                 end_date=dag_run.end_date,
                 logical_date=dag_run.logical_date,
+                clear_number=dag_run.clear_number,
                 dag_run_state=dag_run.get_state(),
                 task_ids=task_ids,
                 msg=msg,
diff --git a/providers/tests/openlineage/plugins/test_adapter.py 
b/providers/tests/openlineage/plugins/test_adapter.py
index a9787a9399a..a2266bb16bc 100644
--- a/providers/tests/openlineage/plugins/test_adapter.py
+++ b/providers/tests/openlineage/plugins/test_adapter.py
@@ -594,6 +594,7 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
         dag_id=dag_id,
         start_date=event_time,
         logical_date=event_time,
+        clear_number=0,
         nominal_start_time=event_time.isoformat(),
         nominal_end_time=event_time.isoformat(),
         owners=["airflow"],
@@ -708,6 +709,7 @@ def test_emit_dag_complete_event(
         run_id=run_id,
         end_date=event_time,
         logical_date=event_time,
+        clear_number=0,
         dag_run_state=DagRunState.SUCCESS,
         task_ids=["task_0", "task_1", "task_2.test"],
     )
@@ -797,6 +799,7 @@ def test_emit_dag_failed_event(
         run_id=run_id,
         end_date=event_time,
         logical_date=event_time,
+        clear_number=0,
         dag_run_state=DagRunState.FAILED,
         task_ids=["task_0", "task_1", "task_2.test"],
         msg="error msg",
@@ -864,6 +867,7 @@ def test_build_dag_run_id_is_valid_uuid():
     result = OpenLineageAdapter.build_dag_run_id(
         dag_id=dag_id,
         logical_date=logical_date,
+        clear_number=0,
     )
     uuid_result = uuid.UUID(result)
     assert uuid_result
@@ -872,24 +876,30 @@ def test_build_dag_run_id_is_valid_uuid():
 
 def test_build_dag_run_id_same_input_give_same_result():
     result1 = OpenLineageAdapter.build_dag_run_id(
-        dag_id="dag1",
-        logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
+        dag_id="dag1", logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1), 
clear_number=0
     )
     result2 = OpenLineageAdapter.build_dag_run_id(
-        dag_id="dag1",
-        logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
+        dag_id="dag1", logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1), 
clear_number=0
     )
     assert result1 == result2
 
 
 def test_build_dag_run_id_different_inputs_give_different_results():
     result1 = OpenLineageAdapter.build_dag_run_id(
-        dag_id="dag1",
-        logical_date=datetime.datetime.now(),
+        dag_id="dag1", logical_date=datetime.datetime.now(), clear_number=0
     )
     result2 = OpenLineageAdapter.build_dag_run_id(
-        dag_id="dag2",
-        logical_date=datetime.datetime.now(),
+        dag_id="dag2", logical_date=datetime.datetime.now(), clear_number=0
+    )
+    assert result1 != result2
+
+
+def test_build_dag_run_id_different_clear_number_give_different_results():
+    result1 = OpenLineageAdapter.build_dag_run_id(
+        dag_id="dag1", logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1), 
clear_number=0
+    )
+    result2 = OpenLineageAdapter.build_dag_run_id(
+        dag_id="dag1", logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1), 
clear_number=1
     )
     assert result1 != result2
 
diff --git a/providers/tests/openlineage/plugins/test_listener.py 
b/providers/tests/openlineage/plugins/test_listener.py
index cbc4436ef01..ccb330faa5d 100644
--- a/providers/tests/openlineage/plugins/test_listener.py
+++ b/providers/tests/openlineage/plugins/test_listener.py
@@ -188,8 +188,8 @@ def _create_listener_and_task_instance() -> 
tuple[OpenLineageListener, TaskInsta
         # Now you can use listener and task_instance in your tests to simulate 
their interaction.
     """
 
-    def mock_dag_id(dag_id, logical_date):
-        return f"{logical_date.isoformat()}.{dag_id}"
+    def mock_dag_id(dag_id, logical_date, clear_number):
+        return f"{logical_date.isoformat()}.{dag_id}.{clear_number}"
 
     def mock_task_id(dag_id, task_id, try_number, logical_date, map_index):
         return 
f"{logical_date.isoformat()}.{dag_id}.{task_id}.{try_number}.{map_index}"
@@ -214,6 +214,7 @@ def _create_listener_and_task_instance() -> 
tuple[OpenLineageListener, TaskInsta
     task_instance.dag_run.run_id = "dag_run_run_id"
     task_instance.dag_run.data_interval_start = None
     task_instance.dag_run.data_interval_end = None
+    task_instance.dag_run.clear_number = 0
     if AIRFLOW_V_3_0_PLUS:
         task_instance.dag_run.logical_date = dt.datetime(2020, 1, 1, 1, 1, 1)
     else:
@@ -276,7 +277,7 @@ def test_adapter_start_task_is_called_with_proper_arguments(
         job_description="Test DAG Description",
         event_time="2023-01-01T13:01:01",
         parent_job_name="dag_id",
-        parent_run_id="2020-01-01T01:01:01.dag_id",
+        parent_run_id="2020-01-01T01:01:01.dag_id.0",
         code_location=None,
         nominal_start_time=None,
         nominal_end_time=None,
@@ -330,7 +331,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments(
         end_time="2023-01-03T13:01:01",
         job_name="job_name",
         parent_job_name="dag_id",
-        parent_run_id="2020-01-01T01:01:01.dag_id",
+        parent_run_id="2020-01-01T01:01:01.dag_id.0",
         run_id="2020-01-01T01:01:01.dag_id.task_id.1.-1",
         task=listener.extractor_manager.extract_metadata(),
         run_facets={
@@ -379,7 +380,7 @@ def 
test_adapter_complete_task_is_called_with_proper_arguments(
         end_time="2023-01-03T13:01:01",
         job_name="job_name",
         parent_job_name="dag_id",
-        parent_run_id="2020-01-01T01:01:01.dag_id",
+        parent_run_id="2020-01-01T01:01:01.dag_id.0",
         
run_id=f"2020-01-01T01:01:01.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}.-1",
         task=listener.extractor_manager.extract_metadata(),
         run_facets={
@@ -653,6 +654,7 @@ def test_listener_logs_failed_serialization():
         run_id="",
         end_date=event_time,
         logical_date=callback_future,
+        clear_number=0,
         dag_run_state=DagRunState.FAILED,
         task_ids=["task_id"],
         msg="",

Reply via email to