This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8640f3e397 move to dag_run.logical_date from execution date in
OpenLineage provider (#41889)
8640f3e397 is described below
commit 8640f3e397ae23d7b6db8e020e82277de32e83e6
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Mon Sep 2 14:29:29 2024 +0200
move to dag_run.logical_date from execution date in OpenLineage provider
(#41889)
Signed-off-by: Maciej Obuchowski <[email protected]>
---
airflow/providers/openlineage/plugins/adapter.py | 10 +++----
airflow/providers/openlineage/plugins/listener.py | 6 ++--
.../providers/openlineage/plugins/test_adapter.py | 12 ++++----
.../providers/openlineage/plugins/test_listener.py | 34 +++++++++++-----------
4 files changed, 31 insertions(+), 31 deletions(-)
diff --git a/airflow/providers/openlineage/plugins/adapter.py
b/airflow/providers/openlineage/plugins/adapter.py
index 70b237d53b..847de5953d 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -118,10 +118,10 @@ class OpenLineageAdapter(LoggingMixin):
return yaml.safe_load(config_file)
@staticmethod
- def build_dag_run_id(dag_id: str, execution_date: datetime) -> str:
+ def build_dag_run_id(dag_id: str, logical_date: datetime) -> str:
return str(
generate_static_uuid(
- instant=execution_date,
+ instant=logical_date,
data=f"{conf.namespace()}.{dag_id}".encode(),
)
)
@@ -357,7 +357,7 @@ class OpenLineageAdapter(LoggingMixin):
run=self._build_run(
run_id=self.build_dag_run_id(
dag_id=dag_run.dag_id,
- execution_date=dag_run.execution_date,
+ logical_date=dag_run.logical_date,
),
job_name=dag_run.dag_id,
nominal_start_time=nominal_start_time,
@@ -384,7 +384,7 @@ class OpenLineageAdapter(LoggingMixin):
run=Run(
runId=self.build_dag_run_id(
dag_id=dag_run.dag_id,
- execution_date=dag_run.execution_date,
+ logical_date=dag_run.logical_date,
),
facets={**get_airflow_state_run_facet(dag_run),
**get_airflow_debug_facet()},
),
@@ -408,7 +408,7 @@ class OpenLineageAdapter(LoggingMixin):
run=Run(
runId=self.build_dag_run_id(
dag_id=dag_run.dag_id,
- execution_date=dag_run.execution_date,
+ logical_date=dag_run.logical_date,
),
facets={
"errorMessage": error_message_run.ErrorMessageRunFacet(
diff --git a/airflow/providers/openlineage/plugins/listener.py
b/airflow/providers/openlineage/plugins/listener.py
index 6c59472ca7..7882e188d9 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -134,7 +134,7 @@ class OpenLineageListener:
return
parent_run_id = self.adapter.build_dag_run_id(
dag_id=dag.dag_id,
- execution_date=dagrun.execution_date,
+ logical_date=dagrun.logical_date,
)
task_uuid = self.adapter.build_task_instance_run_id(
@@ -213,7 +213,7 @@ class OpenLineageListener:
def on_success():
parent_run_id = OpenLineageAdapter.build_dag_run_id(
dag_id=dag.dag_id,
- execution_date=dagrun.execution_date,
+ logical_date=dagrun.logical_date,
)
task_uuid = OpenLineageAdapter.build_task_instance_run_id(
@@ -312,7 +312,7 @@ class OpenLineageListener:
def on_failure():
parent_run_id = OpenLineageAdapter.build_dag_run_id(
dag_id=dag.dag_id,
- execution_date=dagrun.execution_date,
+ logical_date=dagrun.logical_date,
)
task_uuid = OpenLineageAdapter.build_task_instance_run_id(
diff --git a/tests/providers/openlineage/plugins/test_adapter.py
b/tests/providers/openlineage/plugins/test_adapter.py
index 18f457c5be..a1e702e263 100644
--- a/tests/providers/openlineage/plugins/test_adapter.py
+++ b/tests/providers/openlineage/plugins/test_adapter.py
@@ -823,10 +823,10 @@ def test_openlineage_adapter_stats_emit_failed(
def test_build_dag_run_id_is_valid_uuid():
dag_id = "test_dag"
- execution_date = datetime.datetime.now()
+ logical_date = datetime.datetime.now()
result = OpenLineageAdapter.build_dag_run_id(
dag_id=dag_id,
- execution_date=execution_date,
+ logical_date=logical_date,
)
uuid_result = uuid.UUID(result)
assert uuid_result
@@ -836,11 +836,11 @@ def test_build_dag_run_id_is_valid_uuid():
def test_build_dag_run_id_same_input_give_same_result():
result1 = OpenLineageAdapter.build_dag_run_id(
dag_id="dag1",
- execution_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
+ logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
)
result2 = OpenLineageAdapter.build_dag_run_id(
dag_id="dag1",
- execution_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
+ logical_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
)
assert result1 == result2
@@ -848,11 +848,11 @@ def test_build_dag_run_id_same_input_give_same_result():
def test_build_dag_run_id_different_inputs_give_different_results():
result1 = OpenLineageAdapter.build_dag_run_id(
dag_id="dag1",
- execution_date=datetime.datetime.now(),
+ logical_date=datetime.datetime.now(),
)
result2 = OpenLineageAdapter.build_dag_run_id(
dag_id="dag2",
- execution_date=datetime.datetime.now(),
+ logical_date=datetime.datetime.now(),
)
assert result1 != result2
diff --git a/tests/providers/openlineage/plugins/test_listener.py
b/tests/providers/openlineage/plugins/test_listener.py
index eed3a0ea0e..de2732cb36 100644
--- a/tests/providers/openlineage/plugins/test_listener.py
+++ b/tests/providers/openlineage/plugins/test_listener.py
@@ -170,8 +170,8 @@ def _create_listener_and_task_instance() ->
tuple[OpenLineageListener, TaskInsta
# Now you can use listener and task_instance in your tests to simulate
their interaction.
"""
- def mock_dag_id(dag_id, execution_date):
- return f"{execution_date}.{dag_id}"
+ def mock_dag_id(dag_id, logical_date):
+ return f"{logical_date}.{dag_id}"
def mock_task_id(dag_id, task_id, try_number, execution_date):
return f"{execution_date}.{dag_id}.{task_id}.{try_number}"
@@ -197,7 +197,7 @@ def _create_listener_and_task_instance() ->
tuple[OpenLineageListener, TaskInsta
task_instance.dag_run.run_id = "dag_run_run_id"
task_instance.dag_run.data_interval_start = None
task_instance.dag_run.data_interval_end = None
- task_instance.dag_run.execution_date = "execution_date"
+ task_instance.dag_run.execution_date = "logical_date"
task_instance.task = mock.Mock()
task_instance.task.task_id = "task_id"
task_instance.task.dag = mock.Mock()
@@ -210,7 +210,7 @@ def _create_listener_and_task_instance() ->
tuple[OpenLineageListener, TaskInsta
task_instance.state = State.RUNNING
task_instance.start_date = dt.datetime(2023, 1, 1, 13, 1, 1)
task_instance.end_date = dt.datetime(2023, 1, 3, 13, 1, 1)
- task_instance.execution_date = "execution_date"
+ task_instance.execution_date = "2020-01-01T01:01:01"
task_instance.next_method = None # Ensure this is None to reach start_task
return listener, task_instance
@@ -248,12 +248,12 @@ def
test_adapter_start_task_is_called_with_proper_arguments(
listener.on_task_instance_running(None, task_instance, None)
listener.adapter.start_task.assert_called_once_with(
- run_id="execution_date.dag_id.task_id.1",
+ run_id="2020-01-01T01:01:01.dag_id.task_id.1",
job_name="job_name",
job_description="Test DAG Description",
event_time="2023-01-01T13:01:01",
parent_job_name="dag_id",
- parent_run_id="execution_date.dag_id",
+ parent_run_id="2020-01-01T01:01:01.dag_id",
code_location=None,
nominal_start_time=None,
nominal_end_time=None,
@@ -291,8 +291,8 @@ def test_adapter_fail_task_is_called_with_proper_arguments(
failure events, thus confirming that the adapter's failure handling is
functioning as expected.
"""
- def mock_dag_id(dag_id, execution_date):
- return f"{execution_date}.{dag_id}"
+ def mock_dag_id(dag_id, logical_date):
+ return f"{logical_date}.{dag_id}"
def mock_task_id(dag_id, task_id, try_number, execution_date):
return f"{execution_date}.{dag_id}.{task_id}.{try_number}"
@@ -316,8 +316,8 @@ def test_adapter_fail_task_is_called_with_proper_arguments(
end_time="2023-01-03T13:01:01",
job_name="job_name",
parent_job_name="dag_id",
- parent_run_id="execution_date.dag_id",
- run_id="execution_date.dag_id.task_id.1",
+ parent_run_id="2020-01-01T01:01:01.dag_id",
+ run_id="2020-01-01T01:01:01.dag_id.task_id.1",
task=listener.extractor_manager.extract_metadata(),
run_facets={
"custom_user_facet": 2,
@@ -352,8 +352,8 @@ def
test_adapter_complete_task_is_called_with_proper_arguments(
during the task's lifecycle events.
"""
- def mock_dag_id(dag_id, execution_date):
- return f"{execution_date}.{dag_id}"
+ def mock_dag_id(dag_id, logical_date):
+ return f"{logical_date}.{dag_id}"
def mock_task_id(dag_id, task_id, try_number, execution_date):
return f"{execution_date}.{dag_id}.{task_id}.{try_number}"
@@ -375,8 +375,8 @@ def
test_adapter_complete_task_is_called_with_proper_arguments(
end_time="2023-01-03T13:01:01",
job_name="job_name",
parent_job_name="dag_id",
- parent_run_id="execution_date.dag_id",
- run_id=f"execution_date.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}",
+ parent_run_id="2020-01-01T01:01:01.dag_id",
+ run_id=f"2020-01-01T01:01:01.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}",
task=listener.extractor_manager.extract_metadata(),
run_facets={
"custom_user_facet": 2,
@@ -399,7 +399,7 @@ def
test_on_task_instance_running_correctly_calls_openlineage_adapter_run_id_met
listener.adapter.build_task_instance_run_id.assert_called_once_with(
dag_id="dag_id",
task_id="task_id",
- execution_date="execution_date",
+ execution_date="2020-01-01T01:01:01",
try_number=1,
)
@@ -422,7 +422,7 @@ def
test_on_task_instance_failed_correctly_calls_openlineage_adapter_run_id_meth
mock_adapter.build_task_instance_run_id.assert_called_once_with(
dag_id="dag_id",
task_id="task_id",
- execution_date="execution_date",
+ execution_date="2020-01-01T01:01:01",
try_number=1,
)
@@ -441,7 +441,7 @@ def
test_on_task_instance_success_correctly_calls_openlineage_adapter_run_id_met
mock_adapter.build_task_instance_run_id.assert_called_once_with(
dag_id="dag_id",
task_id="task_id",
- execution_date="execution_date",
+ execution_date="2020-01-01T01:01:01",
try_number=EXPECTED_TRY_NUMBER_1,
)