This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8640f3e397 move to dag_run.logical_date from execution date in 
OpenLineage provider (#41889)
8640f3e397 is described below

commit 8640f3e397ae23d7b6db8e020e82277de32e83e6
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Mon Sep 2 14:29:29 2024 +0200

    move to dag_run.logical_date from execution date in OpenLineage provider 
(#41889)
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
---
 airflow/providers/openlineage/plugins/adapter.py   | 10 +++----
 airflow/providers/openlineage/plugins/listener.py  |  6 ++--
 .../providers/openlineage/plugins/test_adapter.py  | 12 ++++----
 .../providers/openlineage/plugins/test_listener.py | 34 +++++++++++-----------
 4 files changed, 31 insertions(+), 31 deletions(-)

diff --git a/airflow/providers/openlineage/plugins/adapter.py 
b/airflow/providers/openlineage/plugins/adapter.py
index 70b237d53b..847de5953d 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -118,10 +118,10 @@ class OpenLineageAdapter(LoggingMixin):
             return yaml.safe_load(config_file)
 
     @staticmethod
-    def build_dag_run_id(dag_id: str, execution_date: datetime) -> str:
+    def build_dag_run_id(dag_id: str, logical_date: datetime) -> str:
         return str(
             generate_static_uuid(
-                instant=execution_date,
+                instant=logical_date,
                 data=f"{conf.namespace()}.{dag_id}".encode(),
             )
         )
@@ -357,7 +357,7 @@ class OpenLineageAdapter(LoggingMixin):
                 run=self._build_run(
                     run_id=self.build_dag_run_id(
                         dag_id=dag_run.dag_id,
-                        execution_date=dag_run.execution_date,
+                        logical_date=dag_run.logical_date,
                     ),
                     job_name=dag_run.dag_id,
                     nominal_start_time=nominal_start_time,
@@ -384,7 +384,7 @@ class OpenLineageAdapter(LoggingMixin):
                 run=Run(
                     runId=self.build_dag_run_id(
                         dag_id=dag_run.dag_id,
-                        execution_date=dag_run.execution_date,
+                        logical_date=dag_run.logical_date,
                     ),
                     facets={**get_airflow_state_run_facet(dag_run), 
**get_airflow_debug_facet()},
                 ),
@@ -408,7 +408,7 @@ class OpenLineageAdapter(LoggingMixin):
                 run=Run(
                     runId=self.build_dag_run_id(
                         dag_id=dag_run.dag_id,
-                        execution_date=dag_run.execution_date,
+                        logical_date=dag_run.logical_date,
                     ),
                     facets={
                         "errorMessage": error_message_run.ErrorMessageRunFacet(
diff --git a/airflow/providers/openlineage/plugins/listener.py 
b/airflow/providers/openlineage/plugins/listener.py
index 6c59472ca7..7882e188d9 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -134,7 +134,7 @@ class OpenLineageListener:
                 return
             parent_run_id = self.adapter.build_dag_run_id(
                 dag_id=dag.dag_id,
-                execution_date=dagrun.execution_date,
+                logical_date=dagrun.logical_date,
             )
 
             task_uuid = self.adapter.build_task_instance_run_id(
@@ -213,7 +213,7 @@ class OpenLineageListener:
         def on_success():
             parent_run_id = OpenLineageAdapter.build_dag_run_id(
                 dag_id=dag.dag_id,
-                execution_date=dagrun.execution_date,
+                logical_date=dagrun.logical_date,
             )
 
             task_uuid = OpenLineageAdapter.build_task_instance_run_id(
@@ -312,7 +312,7 @@ class OpenLineageListener:
         def on_failure():
             parent_run_id = OpenLineageAdapter.build_dag_run_id(
                 dag_id=dag.dag_id,
-                execution_date=dagrun.execution_date,
+                logical_date=dagrun.logical_date,
             )
 
             task_uuid = OpenLineageAdapter.build_task_instance_run_id(
diff --git a/tests/providers/openlineage/plugins/test_adapter.py 
b/tests/providers/openlineage/plugins/test_adapter.py
index 18f457c5be..a1e702e263 100644
--- a/tests/providers/openlineage/plugins/test_adapter.py
+++ b/tests/providers/openlineage/plugins/test_adapter.py
@@ -823,10 +823,10 @@ def test_openlineage_adapter_stats_emit_failed(
 
 def test_build_dag_run_id_is_valid_uuid():
     dag_id = "test_dag"
-    execution_date = datetime.datetime.now()
+    logical_date = datetime.datetime.now()
     result = OpenLineageAdapter.build_dag_run_id(
         dag_id=dag_id,
-        execution_date=execution_date,
+        logical_date=logical_date,
     )
     uuid_result = uuid.UUID(result)
     assert uuid_result
@@ -836,11 +836,11 @@ def test_build_dag_run_id_is_valid_uuid():
 def test_build_dag_run_id_same_input_give_same_result():
     result1 = OpenLineageAdapter.build_dag_run_id(
         dag_id="dag1",
-        execution_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
+        logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
     )
     result2 = OpenLineageAdapter.build_dag_run_id(
         dag_id="dag1",
-        execution_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
+        logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
     )
     assert result1 == result2
 
@@ -848,11 +848,11 @@ def test_build_dag_run_id_same_input_give_same_result():
 def test_build_dag_run_id_different_inputs_give_different_results():
     result1 = OpenLineageAdapter.build_dag_run_id(
         dag_id="dag1",
-        execution_date=datetime.datetime.now(),
+        logical_date=datetime.datetime.now(),
     )
     result2 = OpenLineageAdapter.build_dag_run_id(
         dag_id="dag2",
-        execution_date=datetime.datetime.now(),
+        logical_date=datetime.datetime.now(),
     )
     assert result1 != result2
 
diff --git a/tests/providers/openlineage/plugins/test_listener.py 
b/tests/providers/openlineage/plugins/test_listener.py
index eed3a0ea0e..de2732cb36 100644
--- a/tests/providers/openlineage/plugins/test_listener.py
+++ b/tests/providers/openlineage/plugins/test_listener.py
@@ -170,8 +170,8 @@ def _create_listener_and_task_instance() -> 
tuple[OpenLineageListener, TaskInsta
         # Now you can use listener and task_instance in your tests to simulate 
their interaction.
     """
 
-    def mock_dag_id(dag_id, execution_date):
-        return f"{execution_date}.{dag_id}"
+    def mock_dag_id(dag_id, logical_date):
+        return f"{logical_date}.{dag_id}"
 
     def mock_task_id(dag_id, task_id, try_number, execution_date):
         return f"{execution_date}.{dag_id}.{task_id}.{try_number}"
@@ -197,7 +197,7 @@ def _create_listener_and_task_instance() -> 
tuple[OpenLineageListener, TaskInsta
     task_instance.dag_run.run_id = "dag_run_run_id"
     task_instance.dag_run.data_interval_start = None
     task_instance.dag_run.data_interval_end = None
-    task_instance.dag_run.execution_date = "execution_date"
+    task_instance.dag_run.execution_date = "logical_date"
     task_instance.task = mock.Mock()
     task_instance.task.task_id = "task_id"
     task_instance.task.dag = mock.Mock()
@@ -210,7 +210,7 @@ def _create_listener_and_task_instance() -> 
tuple[OpenLineageListener, TaskInsta
     task_instance.state = State.RUNNING
     task_instance.start_date = dt.datetime(2023, 1, 1, 13, 1, 1)
     task_instance.end_date = dt.datetime(2023, 1, 3, 13, 1, 1)
-    task_instance.execution_date = "execution_date"
+    task_instance.execution_date = "2020-01-01T01:01:01"
     task_instance.next_method = None  # Ensure this is None to reach start_task
 
     return listener, task_instance
@@ -248,12 +248,12 @@ def 
test_adapter_start_task_is_called_with_proper_arguments(
 
     listener.on_task_instance_running(None, task_instance, None)
     listener.adapter.start_task.assert_called_once_with(
-        run_id="execution_date.dag_id.task_id.1",
+        run_id="2020-01-01T01:01:01.dag_id.task_id.1",
         job_name="job_name",
         job_description="Test DAG Description",
         event_time="2023-01-01T13:01:01",
         parent_job_name="dag_id",
-        parent_run_id="execution_date.dag_id",
+        parent_run_id="2020-01-01T01:01:01.dag_id",
         code_location=None,
         nominal_start_time=None,
         nominal_end_time=None,
@@ -291,8 +291,8 @@ def test_adapter_fail_task_is_called_with_proper_arguments(
     failure events, thus confirming that the adapter's failure handling is 
functioning as expected.
     """
 
-    def mock_dag_id(dag_id, execution_date):
-        return f"{execution_date}.{dag_id}"
+    def mock_dag_id(dag_id, logical_date):
+        return f"{logical_date}.{dag_id}"
 
     def mock_task_id(dag_id, task_id, try_number, execution_date):
         return f"{execution_date}.{dag_id}.{task_id}.{try_number}"
@@ -316,8 +316,8 @@ def test_adapter_fail_task_is_called_with_proper_arguments(
         end_time="2023-01-03T13:01:01",
         job_name="job_name",
         parent_job_name="dag_id",
-        parent_run_id="execution_date.dag_id",
-        run_id="execution_date.dag_id.task_id.1",
+        parent_run_id="2020-01-01T01:01:01.dag_id",
+        run_id="2020-01-01T01:01:01.dag_id.task_id.1",
         task=listener.extractor_manager.extract_metadata(),
         run_facets={
             "custom_user_facet": 2,
@@ -352,8 +352,8 @@ def 
test_adapter_complete_task_is_called_with_proper_arguments(
     during the task's lifecycle events.
     """
 
-    def mock_dag_id(dag_id, execution_date):
-        return f"{execution_date}.{dag_id}"
+    def mock_dag_id(dag_id, logical_date):
+        return f"{logical_date}.{dag_id}"
 
     def mock_task_id(dag_id, task_id, try_number, execution_date):
         return f"{execution_date}.{dag_id}.{task_id}.{try_number}"
@@ -375,8 +375,8 @@ def 
test_adapter_complete_task_is_called_with_proper_arguments(
         end_time="2023-01-03T13:01:01",
         job_name="job_name",
         parent_job_name="dag_id",
-        parent_run_id="execution_date.dag_id",
-        run_id=f"execution_date.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}",
+        parent_run_id="2020-01-01T01:01:01.dag_id",
+        run_id=f"2020-01-01T01:01:01.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}",
         task=listener.extractor_manager.extract_metadata(),
         run_facets={
             "custom_user_facet": 2,
@@ -399,7 +399,7 @@ def 
test_on_task_instance_running_correctly_calls_openlineage_adapter_run_id_met
     listener.adapter.build_task_instance_run_id.assert_called_once_with(
         dag_id="dag_id",
         task_id="task_id",
-        execution_date="execution_date",
+        execution_date="2020-01-01T01:01:01",
         try_number=1,
     )
 
@@ -422,7 +422,7 @@ def 
test_on_task_instance_failed_correctly_calls_openlineage_adapter_run_id_meth
     mock_adapter.build_task_instance_run_id.assert_called_once_with(
         dag_id="dag_id",
         task_id="task_id",
-        execution_date="execution_date",
+        execution_date="2020-01-01T01:01:01",
         try_number=1,
     )
 
@@ -441,7 +441,7 @@ def 
test_on_task_instance_success_correctly_calls_openlineage_adapter_run_id_met
     mock_adapter.build_task_instance_run_id.assert_called_once_with(
         dag_id="dag_id",
         task_id="task_id",
-        execution_date="execution_date",
+        execution_date="2020-01-01T01:01:01",
         try_number=EXPECTED_TRY_NUMBER_1,
     )
 

Reply via email to