This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch move-to-logical-date-ol-provider in repository https://gitbox.apache.org/repos/asf/airflow.git
commit d38e89c7eca2a40b5646187bcdf22b3f80fb5890 Author: Maciej Obuchowski <[email protected]> AuthorDate: Fri Aug 30 14:28:47 2024 +0200 move to dag_run.logical_date from execution date in OpenLineage provider Signed-off-by: Maciej Obuchowski <[email protected]> --- airflow/providers/openlineage/plugins/adapter.py | 10 +++---- airflow/providers/openlineage/plugins/listener.py | 6 ++-- .../providers/openlineage/plugins/test_adapter.py | 12 ++++---- .../providers/openlineage/plugins/test_listener.py | 34 +++++++++++----------- 4 files changed, 31 insertions(+), 31 deletions(-) diff --git a/airflow/providers/openlineage/plugins/adapter.py b/airflow/providers/openlineage/plugins/adapter.py index 70b237d53b..847de5953d 100644 --- a/airflow/providers/openlineage/plugins/adapter.py +++ b/airflow/providers/openlineage/plugins/adapter.py @@ -118,10 +118,10 @@ class OpenLineageAdapter(LoggingMixin): return yaml.safe_load(config_file) @staticmethod - def build_dag_run_id(dag_id: str, execution_date: datetime) -> str: + def build_dag_run_id(dag_id: str, logical_date: datetime) -> str: return str( generate_static_uuid( - instant=execution_date, + instant=logical_date, data=f"{conf.namespace()}.{dag_id}".encode(), ) ) @@ -357,7 +357,7 @@ class OpenLineageAdapter(LoggingMixin): run=self._build_run( run_id=self.build_dag_run_id( dag_id=dag_run.dag_id, - execution_date=dag_run.execution_date, + logical_date=dag_run.logical_date, ), job_name=dag_run.dag_id, nominal_start_time=nominal_start_time, @@ -384,7 +384,7 @@ class OpenLineageAdapter(LoggingMixin): run=Run( runId=self.build_dag_run_id( dag_id=dag_run.dag_id, - execution_date=dag_run.execution_date, + logical_date=dag_run.logical_date, ), facets={**get_airflow_state_run_facet(dag_run), **get_airflow_debug_facet()}, ), @@ -408,7 +408,7 @@ class OpenLineageAdapter(LoggingMixin): run=Run( runId=self.build_dag_run_id( dag_id=dag_run.dag_id, - execution_date=dag_run.execution_date, + logical_date=dag_run.logical_date, ), facets={ "errorMessage": error_message_run.ErrorMessageRunFacet( diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 6c59472ca7..7882e188d9 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -134,7 +134,7 @@ class OpenLineageListener: return parent_run_id = self.adapter.build_dag_run_id( dag_id=dag.dag_id, - execution_date=dagrun.execution_date, + logical_date=dagrun.logical_date, ) task_uuid = self.adapter.build_task_instance_run_id( @@ -213,7 +213,7 @@ class OpenLineageListener: def on_success(): parent_run_id = OpenLineageAdapter.build_dag_run_id( dag_id=dag.dag_id, - execution_date=dagrun.execution_date, + logical_date=dagrun.logical_date, ) task_uuid = OpenLineageAdapter.build_task_instance_run_id( @@ -312,7 +312,7 @@ class OpenLineageListener: def on_failure(): parent_run_id = OpenLineageAdapter.build_dag_run_id( dag_id=dag.dag_id, - execution_date=dagrun.execution_date, + logical_date=dagrun.logical_date, ) task_uuid = OpenLineageAdapter.build_task_instance_run_id( diff --git a/tests/providers/openlineage/plugins/test_adapter.py b/tests/providers/openlineage/plugins/test_adapter.py index 19bba5fffb..b4d81fedf5 100644 --- a/tests/providers/openlineage/plugins/test_adapter.py +++ b/tests/providers/openlineage/plugins/test_adapter.py @@ -826,10 +826,10 @@ def test_openlineage_adapter_stats_emit_failed( def test_build_dag_run_id_is_valid_uuid(): dag_id = "test_dag" - execution_date = datetime.datetime.now() + logical_date = datetime.datetime.now() result = OpenLineageAdapter.build_dag_run_id( dag_id=dag_id, - execution_date=execution_date, + logical_date=logical_date, ) uuid_result = uuid.UUID(result) assert uuid_result @@ -839,11 +839,11 @@ def test_build_dag_run_id_is_valid_uuid(): def test_build_dag_run_id_same_input_give_same_result(): result1 = OpenLineageAdapter.build_dag_run_id( dag_id="dag1", - execution_date=datetime.datetime(2024, 1, 1, 1, 1, 1), + logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1), ) result2 = OpenLineageAdapter.build_dag_run_id( dag_id="dag1", - execution_date=datetime.datetime(2024, 1, 1, 1, 1, 1), + logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1), ) assert result1 == result2 @@ -851,11 +851,11 @@ def test_build_dag_run_id_same_input_give_same_result(): def test_build_dag_run_id_different_inputs_give_different_results(): result1 = OpenLineageAdapter.build_dag_run_id( dag_id="dag1", - execution_date=datetime.datetime.now(), + logical_date=datetime.datetime.now(), ) result2 = OpenLineageAdapter.build_dag_run_id( dag_id="dag2", - execution_date=datetime.datetime.now(), + logical_date=datetime.datetime.now(), ) assert result1 != result2 diff --git a/tests/providers/openlineage/plugins/test_listener.py b/tests/providers/openlineage/plugins/test_listener.py index eed3a0ea0e..de2732cb36 100644 --- a/tests/providers/openlineage/plugins/test_listener.py +++ b/tests/providers/openlineage/plugins/test_listener.py @@ -170,8 +170,8 @@ def _create_listener_and_task_instance() -> tuple[OpenLineageListener, TaskInsta # Now you can use listener and task_instance in your tests to simulate their interaction. """ - def mock_dag_id(dag_id, execution_date): - return f"{execution_date}.{dag_id}" + def mock_dag_id(dag_id, logical_date): + return f"{logical_date}.{dag_id}" def mock_task_id(dag_id, task_id, try_number, execution_date): return f"{execution_date}.{dag_id}.{task_id}.{try_number}" @@ -197,7 +197,7 @@ def _create_listener_and_task_instance() -> tuple[OpenLineageListener, TaskInsta task_instance.dag_run.run_id = "dag_run_run_id" task_instance.dag_run.data_interval_start = None task_instance.dag_run.data_interval_end = None - task_instance.dag_run.execution_date = "execution_date" + task_instance.dag_run.execution_date = "logical_date" task_instance.task = mock.Mock() task_instance.task.task_id = "task_id" task_instance.task.dag = mock.Mock() @@ -210,7 +210,7 @@ def _create_listener_and_task_instance() -> tuple[OpenLineageListener, TaskInsta task_instance.state = State.RUNNING task_instance.start_date = dt.datetime(2023, 1, 1, 13, 1, 1) task_instance.end_date = dt.datetime(2023, 1, 3, 13, 1, 1) - task_instance.execution_date = "execution_date" + task_instance.execution_date = "2020-01-01T01:01:01" task_instance.next_method = None # Ensure this is None to reach start_task return listener, task_instance @@ -248,12 +248,12 @@ def test_adapter_start_task_is_called_with_proper_arguments( listener.on_task_instance_running(None, task_instance, None) listener.adapter.start_task.assert_called_once_with( - run_id="execution_date.dag_id.task_id.1", + run_id="2020-01-01T01:01:01.dag_id.task_id.1", job_name="job_name", job_description="Test DAG Description", event_time="2023-01-01T13:01:01", parent_job_name="dag_id", - parent_run_id="execution_date.dag_id", + parent_run_id="2020-01-01T01:01:01.dag_id", code_location=None, nominal_start_time=None, nominal_end_time=None, @@ -291,8 +291,8 @@ def test_adapter_fail_task_is_called_with_proper_arguments( failure events, thus confirming that the adapter's failure handling is functioning as expected. """ - def mock_dag_id(dag_id, execution_date): - return f"{execution_date}.{dag_id}" + def mock_dag_id(dag_id, logical_date): + return f"{logical_date}.{dag_id}" def mock_task_id(dag_id, task_id, try_number, execution_date): return f"{execution_date}.{dag_id}.{task_id}.{try_number}" @@ -316,8 +316,8 @@ def test_adapter_fail_task_is_called_with_proper_arguments( end_time="2023-01-03T13:01:01", job_name="job_name", parent_job_name="dag_id", - parent_run_id="execution_date.dag_id", - run_id="execution_date.dag_id.task_id.1", + parent_run_id="2020-01-01T01:01:01.dag_id", + run_id="2020-01-01T01:01:01.dag_id.task_id.1", task=listener.extractor_manager.extract_metadata(), run_facets={ "custom_user_facet": 2, @@ -352,8 +352,8 @@ def test_adapter_complete_task_is_called_with_proper_arguments( during the task's lifecycle events. """ - def mock_dag_id(dag_id, execution_date): - return f"{execution_date}.{dag_id}" + def mock_dag_id(dag_id, logical_date): + return f"{logical_date}.{dag_id}" def mock_task_id(dag_id, task_id, try_number, execution_date): return f"{execution_date}.{dag_id}.{task_id}.{try_number}" @@ -375,8 +375,8 @@ def test_adapter_complete_task_is_called_with_proper_arguments( end_time="2023-01-03T13:01:01", job_name="job_name", parent_job_name="dag_id", - parent_run_id="execution_date.dag_id", - run_id=f"execution_date.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}", + parent_run_id="2020-01-01T01:01:01.dag_id", + run_id=f"2020-01-01T01:01:01.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}", task=listener.extractor_manager.extract_metadata(), run_facets={ "custom_user_facet": 2, @@ -399,7 +399,7 @@ def test_on_task_instance_running_correctly_calls_openlineage_adapter_run_id_met listener.adapter.build_task_instance_run_id.assert_called_once_with( dag_id="dag_id", task_id="task_id", - execution_date="execution_date", + execution_date="2020-01-01T01:01:01", try_number=1, ) @@ -422,7 +422,7 @@ def test_on_task_instance_failed_correctly_calls_openlineage_adapter_run_id_meth mock_adapter.build_task_instance_run_id.assert_called_once_with( dag_id="dag_id", task_id="task_id", - execution_date="execution_date", + execution_date="2020-01-01T01:01:01", try_number=1, ) @@ -441,7 +441,7 @@ def test_on_task_instance_success_correctly_calls_openlineage_adapter_run_id_met mock_adapter.build_task_instance_run_id.assert_called_once_with( dag_id="dag_id", task_id="task_id", - execution_date="execution_date", + execution_date="2020-01-01T01:01:01", try_number=EXPECTED_TRY_NUMBER_1, )
