This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 95a83102e8 feat: Add dag_id when generating OpenLineage run_id for
task instance. (#36659)
95a83102e8 is described below
commit 95a83102e8753c2f8caf5b0d5c847f4c7f254f67
Author: Kacper Muda <[email protected]>
AuthorDate: Tue Jan 9 13:15:07 2024 +0100
feat: Add dag_id when generating OpenLineage run_id for task instance.
(#36659)
---
airflow/providers/dbt/cloud/utils/openlineage.py | 5 +-
airflow/providers/openlineage/plugins/adapter.py | 4 +-
airflow/providers/openlineage/plugins/listener.py | 27 ++++++----
airflow/providers/openlineage/plugins/macros.py | 10 +++-
tests/always/test_project_structure.py | 1 -
.../providers/openlineage/plugins/test_listener.py | 57 +++++++++++++++++-----
tests/providers/openlineage/plugins/test_macros.py | 52 ++++++++++++++++++++
.../plugins/test_openlineage_adapter.py | 44 ++++++++---------
8 files changed, 149 insertions(+), 51 deletions(-)
diff --git a/airflow/providers/dbt/cloud/utils/openlineage.py
b/airflow/providers/dbt/cloud/utils/openlineage.py
index 6a0934d412..f86c77a689 100644
--- a/airflow/providers/dbt/cloud/utils/openlineage.py
+++ b/airflow/providers/dbt/cloud/utils/openlineage.py
@@ -121,7 +121,10 @@ def generate_openlineage_events_from_dbt_cloud_run(
# generate same run id of current task instance
parent_run_id = OpenLineageAdapter.build_task_instance_run_id(
- operator.task_id, task_instance.execution_date,
task_instance.try_number - 1
+ dag_id=task_instance.dag_id,
+ task_id=operator.task_id,
+ execution_date=task_instance.execution_date,
+ try_number=task_instance.try_number - 1,
)
parent_job = ParentRunMetadata(
diff --git a/airflow/providers/openlineage/plugins/adapter.py
b/airflow/providers/openlineage/plugins/adapter.py
index ad648f8828..6f16b01706 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -102,11 +102,11 @@ class OpenLineageAdapter(LoggingMixin):
return str(uuid.uuid3(uuid.NAMESPACE_URL,
f"{_DAG_NAMESPACE}.{dag_id}.{dag_run_id}"))
@staticmethod
- def build_task_instance_run_id(task_id, execution_date, try_number):
+ def build_task_instance_run_id(dag_id, task_id, execution_date,
try_number):
return str(
uuid.uuid3(
uuid.NAMESPACE_URL,
- f"{_DAG_NAMESPACE}.{task_id}.{execution_date}.{try_number}",
+
f"{_DAG_NAMESPACE}.{dag_id}.{task_id}.{execution_date}.{try_number}",
)
)
diff --git a/airflow/providers/openlineage/plugins/listener.py
b/airflow/providers/openlineage/plugins/listener.py
index 6fa02b9fc1..8c731dd6ff 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -77,7 +77,10 @@ class OpenLineageListener:
parent_run_id = self.adapter.build_dag_run_id(dag.dag_id,
dagrun.run_id)
task_uuid = self.adapter.build_task_instance_run_id(
- task.task_id, task_instance.execution_date,
task_instance.try_number
+ dag_id=dag.dag_id,
+ task_id=task.task_id,
+ execution_date=task_instance.execution_date,
+ try_number=task_instance.try_number,
)
task_metadata = self.extractor_manager.extract_metadata(dagrun,
task)
@@ -116,14 +119,17 @@ class OpenLineageListener:
task = task_instance.task
dag = task.dag
- task_uuid = OpenLineageAdapter.build_task_instance_run_id(
- task.task_id, task_instance.execution_date,
task_instance.try_number - 1
- )
-
@print_warning(self.log)
def on_success():
parent_run_id = OpenLineageAdapter.build_dag_run_id(dag.dag_id,
dagrun.run_id)
+ task_uuid = OpenLineageAdapter.build_task_instance_run_id(
+ dag_id=dag.dag_id,
+ task_id=task.task_id,
+ execution_date=task_instance.execution_date,
+ try_number=task_instance.try_number - 1,
+ )
+
task_metadata = self.extractor_manager.extract_metadata(
dagrun, task, complete=True, task_instance=task_instance
)
@@ -149,14 +155,17 @@ class OpenLineageListener:
task = task_instance.task
dag = task.dag
- task_uuid = OpenLineageAdapter.build_task_instance_run_id(
- task.task_id, task_instance.execution_date,
task_instance.try_number
- )
-
@print_warning(self.log)
def on_failure():
parent_run_id = OpenLineageAdapter.build_dag_run_id(dag.dag_id,
dagrun.run_id)
+ task_uuid = OpenLineageAdapter.build_task_instance_run_id(
+ dag_id=dag.dag_id,
+ task_id=task.task_id,
+ execution_date=task_instance.execution_date,
+ try_number=task_instance.try_number,
+ )
+
task_metadata = self.extractor_manager.extract_metadata(
dagrun, task, complete=True, task_instance=task_instance
)
diff --git a/airflow/providers/openlineage/plugins/macros.py
b/airflow/providers/openlineage/plugins/macros.py
index 61af81a1eb..a4039db2f4 100644
--- a/airflow/providers/openlineage/plugins/macros.py
+++ b/airflow/providers/openlineage/plugins/macros.py
@@ -39,7 +39,10 @@ def lineage_run_id(task_instance: TaskInstance):
:ref:`howto/macros:openlineage`
"""
return OpenLineageAdapter.build_task_instance_run_id(
- task_instance.task.task_id, task_instance.execution_date,
task_instance.try_number
+ dag_id=task_instance.dag_id,
+ task_id=task_instance.task.task_id,
+ execution_date=task_instance.execution_date,
+ try_number=task_instance.try_number,
)
@@ -55,6 +58,9 @@ def lineage_parent_id(run_id: str, task_instance:
TaskInstance):
:ref:`howto/macros:openlineage`
"""
job_name = OpenLineageAdapter.build_task_instance_run_id(
- task_instance.task.task_id, task_instance.execution_date,
task_instance.try_number
+ dag_id=task_instance.dag_id,
+ task_id=task_instance.task.task_id,
+ execution_date=task_instance.execution_date,
+ try_number=task_instance.try_number,
)
return f"{_JOB_NAMESPACE}/{job_name}/{run_id}"
diff --git a/tests/always/test_project_structure.py
b/tests/always/test_project_structure.py
index c88387919e..ca59b70c6f 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -163,7 +163,6 @@ class TestProjectStructure:
"tests/providers/openlineage/extractors/test_manager.py",
"tests/providers/openlineage/plugins/test_adapter.py",
"tests/providers/openlineage/plugins/test_facets.py",
- "tests/providers/openlineage/plugins/test_macros.py",
"tests/providers/openlineage/test_sqlparser.py",
"tests/providers/redis/operators/test_redis_publish.py",
"tests/providers/redis/sensors/test_redis_key.py",
diff --git a/tests/providers/openlineage/plugins/test_listener.py
b/tests/providers/openlineage/plugins/test_listener.py
index c616f77f97..827c17c9f7 100644
--- a/tests/providers/openlineage/plugins/test_listener.py
+++ b/tests/providers/openlineage/plugins/test_listener.py
@@ -151,6 +151,10 @@ def _create_listener_and_task_instance() ->
tuple[OpenLineageListener, TaskInsta
listener, task_instance = _create_listener_and_task_instance()
# Now you can use listener and task_instance in your tests to simulate
their interaction.
"""
+
+ def mock_task_id(dag_id, task_id, execution_date, try_number):
+ return f"{dag_id}.{task_id}.{execution_date}.{try_number}"
+
listener = OpenLineageListener()
listener.log = mock.Mock()
listener.extractor_manager = mock.Mock()
@@ -161,7 +165,7 @@ def _create_listener_and_task_instance() ->
tuple[OpenLineageListener, TaskInsta
adapter = mock.Mock()
adapter.build_dag_run_id.side_effect = lambda x, y: f"{x}.{y}"
- adapter.build_task_instance_run_id.side_effect = lambda x, y, z:
f"{x}.{y}.{z}"
+ adapter.build_task_instance_run_id.side_effect = mock_task_id
adapter.start_task = mock.Mock()
adapter.fail_task = mock.Mock()
adapter.complete_task = mock.Mock()
@@ -211,7 +215,7 @@ def test_adapter_start_task_is_called_with_proper_arguments(
listener.on_task_instance_running(None, task_instance, None)
listener.adapter.start_task.assert_called_once_with(
- run_id="task_id.execution_date.1",
+ run_id="dag_id.task_id.execution_date.1",
job_name="job_name",
job_description="Test DAG Description",
event_time="2023-01-01T13:01:01",
@@ -239,9 +243,13 @@ def
test_adapter_fail_task_is_called_with_proper_arguments(mock_get_job_name, mo
the test verifies the integrity and consistency of the data passed to the
adapter during task
failure events, thus confirming that the adapter's failure handling is
functioning as expected.
"""
+
+ def mock_task_id(dag_id, task_id, execution_date, try_number):
+ return f"{dag_id}.{task_id}.{execution_date}.{try_number}"
+
listener, task_instance = _create_listener_and_task_instance()
mock_get_job_name.return_value = "job_name"
- mocked_adapter.build_task_instance_run_id.side_effect = lambda x, y, z:
f"{x}.{y}.{z}"
+ mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
mocked_adapter.build_dag_run_id.side_effect = lambda x, y: f"{x}.{y}"
listener.on_task_instance_failed(None, task_instance, None)
@@ -250,7 +258,7 @@ def
test_adapter_fail_task_is_called_with_proper_arguments(mock_get_job_name, mo
job_name="job_name",
parent_job_name="dag_id",
parent_run_id="dag_id.dag_run_run_id",
- run_id="task_id.execution_date.1",
+ run_id="dag_id.task_id.execution_date.1",
task=listener.extractor_manager.extract_metadata(),
)
@@ -266,9 +274,13 @@ def
test_adapter_complete_task_is_called_with_proper_arguments(mock_get_job_name
accordingly. This helps confirm the consistency and correctness of the
data passed to the adapter
during the task's lifecycle events.
"""
+
+ def mock_task_id(dag_id, task_id, execution_date, try_number):
+ return f"{dag_id}.{task_id}.{execution_date}.{try_number}"
+
listener, task_instance = _create_listener_and_task_instance()
mock_get_job_name.return_value = "job_name"
- mocked_adapter.build_task_instance_run_id.side_effect = lambda x, y, z:
f"{x}.{y}.{z}"
+ mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
mocked_adapter.build_dag_run_id.side_effect = lambda x, y: f"{x}.{y}"
listener.on_task_instance_success(None, task_instance, None)
@@ -279,7 +291,7 @@ def
test_adapter_complete_task_is_called_with_proper_arguments(mock_get_job_name
job_name="job_name",
parent_job_name="dag_id",
parent_run_id="dag_id.dag_run_run_id",
- run_id="task_id.execution_date.0",
+ run_id="dag_id.task_id.execution_date.0",
task=listener.extractor_manager.extract_metadata(),
)
@@ -292,7 +304,7 @@ def
test_adapter_complete_task_is_called_with_proper_arguments(mock_get_job_name
job_name="job_name",
parent_job_name="dag_id",
parent_run_id="dag_id.dag_run_run_id",
- run_id="task_id.execution_date.1",
+ run_id="dag_id.task_id.execution_date.1",
task=listener.extractor_manager.extract_metadata(),
)
@@ -305,12 +317,16 @@ def
test_run_id_is_constant_across_all_methods(mocked_adapter):
reflecting the task's identity and execution context. The test also
simulates the change in the
try_number attribute, as it would occur in Airflow, to verify that the
run_id updates accordingly.
"""
+
+ def mock_task_id(dag_id, task_id, execution_date, try_number):
+ return f"{dag_id}.{task_id}.{execution_date}.{try_number}"
+
listener, task_instance = _create_listener_and_task_instance()
- mocked_adapter.build_task_instance_run_id.side_effect = lambda x, y, z:
f"{x}.{y}.{z}"
+ mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
listener.on_task_instance_running(None, task_instance, None)
expected_run_id = listener.adapter.start_task.call_args.kwargs["run_id"]
- assert expected_run_id == "task_id.execution_date.1"
+ assert expected_run_id == "dag_id.task_id.execution_date.1"
listener.on_task_instance_failed(None, task_instance, None)
assert listener.adapter.fail_task.call_args.kwargs["run_id"] ==
expected_run_id
@@ -318,7 +334,7 @@ def
test_run_id_is_constant_across_all_methods(mocked_adapter):
# This run_id will be different as we did NOT simulate increase of the
try_number attribute,
# which happens in Airflow.
listener.on_task_instance_success(None, task_instance, None)
- assert listener.adapter.complete_task.call_args.kwargs["run_id"] ==
"task_id.execution_date.0"
+ assert listener.adapter.complete_task.call_args.kwargs["run_id"] ==
"dag_id.task_id.execution_date.0"
# Now we simulate the increase of try_number, and the run_id should
reflect that change.
# This is how airflow works, and that's why we expect the run_id to remain
constant across all methods.
@@ -336,7 +352,12 @@ def
test_running_task_correctly_calls_openlineage_adapter_run_id_method():
"""
listener, task_instance = _create_listener_and_task_instance()
listener.on_task_instance_running(None, task_instance, None)
-
listener.adapter.build_task_instance_run_id.assert_called_once_with("task_id",
"execution_date", 1)
+ listener.adapter.build_task_instance_run_id.assert_called_once_with(
+ dag_id="dag_id",
+ task_id="task_id",
+ execution_date="execution_date",
+ try_number=1,
+ )
@mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageAdapter")
@@ -349,7 +370,12 @@ def
test_failed_task_correctly_calls_openlineage_adapter_run_id_method(mock_adap
"""
listener, task_instance = _create_listener_and_task_instance()
listener.on_task_instance_failed(None, task_instance, None)
- mock_adapter.build_task_instance_run_id.assert_called_with("task_id",
"execution_date", 1)
+ mock_adapter.build_task_instance_run_id.assert_called_once_with(
+ dag_id="dag_id",
+ task_id="task_id",
+ execution_date="execution_date",
+ try_number=1,
+ )
@mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageAdapter")
@@ -362,7 +388,12 @@ def
test_successful_task_correctly_calls_openlineage_adapter_run_id_method(mock_
"""
listener, task_instance = _create_listener_and_task_instance()
listener.on_task_instance_success(None, task_instance, None)
- mock_adapter.build_task_instance_run_id.assert_called_with("task_id",
"execution_date", 0)
+ mock_adapter.build_task_instance_run_id.assert_called_once_with(
+ dag_id="dag_id",
+ task_id="task_id",
+ execution_date="execution_date",
+ try_number=0,
+ )
@mock.patch("airflow.models.taskinstance.get_listener_manager")
diff --git a/tests/providers/openlineage/plugins/test_macros.py
b/tests/providers/openlineage/plugins/test_macros.py
new file mode 100644
index 0000000000..bea73628c1
--- /dev/null
+++ b/tests/providers/openlineage/plugins/test_macros.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import uuid
+from unittest import mock
+
+from airflow.providers.openlineage.plugins.adapter import _DAG_NAMESPACE
+from airflow.providers.openlineage.plugins.macros import lineage_parent_id,
lineage_run_id
+
+
+def test_lineage_run_id():
+ task = mock.MagicMock(
+ dag_id="dag_id", execution_date="execution_date", try_number=1,
task=mock.MagicMock(task_id="task_id")
+ )
+ actual = lineage_run_id(task)
+ expected = str(
+ uuid.uuid3(
+ uuid.NAMESPACE_URL,
+ f"{_DAG_NAMESPACE}.dag_id.task_id.execution_date.1",
+ )
+ )
+ assert actual == expected
+
+
+def test_lineage_parent_id():
+ task = mock.MagicMock(
+ dag_id="dag_id", execution_date="execution_date", try_number=1,
task=mock.MagicMock(task_id="task_id")
+ )
+ actual = lineage_parent_id(run_id="run_id", task_instance=task)
+ job_name = str(
+ uuid.uuid3(
+ uuid.NAMESPACE_URL,
+ f"{_DAG_NAMESPACE}.dag_id.task_id.execution_date.1",
+ )
+ )
+ expected = f"{_DAG_NAMESPACE}/{job_name}/run_id"
+ assert actual == expected
diff --git a/tests/providers/openlineage/plugins/test_openlineage_adapter.py
b/tests/providers/openlineage/plugins/test_openlineage_adapter.py
index 70d374b07e..cdec3de941 100644
--- a/tests/providers/openlineage/plugins/test_openlineage_adapter.py
+++ b/tests/providers/openlineage/plugins/test_openlineage_adapter.py
@@ -161,7 +161,7 @@ def test_emit_start_event(mock_stats_incr,
mock_stats_timer):
},
),
job=Job(
- namespace="default",
+ namespace=_DAG_NAMESPACE,
name="job",
facets={"documentation":
DocumentationJobFacet(description="description")},
),
@@ -222,18 +222,18 @@ def
test_emit_start_event_with_additional_information(mock_stats_incr, mock_stat
),
"parent": ParentRunFacet(
run={"runId": "parent_run_id"},
- job={"namespace": "default", "name":
"parent_job_name"},
+ job={"namespace": _DAG_NAMESPACE, "name":
"parent_job_name"},
),
"parentRun": ParentRunFacet(
run={"runId": "parent_run_id"},
- job={"namespace": "default", "name":
"parent_job_name"},
+ job={"namespace": _DAG_NAMESPACE, "name":
"parent_job_name"},
),
"externalQuery1":
ExternalQueryRunFacet(externalQueryId="123", source="source"),
"externalQuery2":
ExternalQueryRunFacet(externalQueryId="999", source="source"),
},
),
job=Job(
- namespace="default",
+ namespace=_DAG_NAMESPACE,
name="job",
facets={
"documentation":
DocumentationJobFacet(description="description"),
@@ -284,7 +284,7 @@ def test_emit_complete_event(mock_stats_incr,
mock_stats_timer):
eventType=RunState.COMPLETE,
eventTime=event_time,
run=Run(runId=run_id, facets={}),
- job=Job(namespace="default", name="job", facets={}),
+ job=Job(namespace=_DAG_NAMESPACE, name="job", facets={}),
producer=_PRODUCER,
inputs=[],
outputs=[],
@@ -329,16 +329,16 @@ def
test_emit_complete_event_with_additional_information(mock_stats_incr, mock_s
facets={
"parent": ParentRunFacet(
run={"runId": "parent_run_id"},
- job={"namespace": "default", "name":
"parent_job_name"},
+ job={"namespace": _DAG_NAMESPACE, "name":
"parent_job_name"},
),
"parentRun": ParentRunFacet(
run={"runId": "parent_run_id"},
- job={"namespace": "default", "name":
"parent_job_name"},
+ job={"namespace": _DAG_NAMESPACE, "name":
"parent_job_name"},
),
"externalQuery":
ExternalQueryRunFacet(externalQueryId="123", source="source"),
},
),
- job=Job(namespace="default", name="job", facets={"sql":
SqlJobFacet(query="SELECT 1;")}),
+ job=Job(namespace=_DAG_NAMESPACE, name="job", facets={"sql":
SqlJobFacet(query="SELECT 1;")}),
producer=_PRODUCER,
inputs=[
Dataset(namespace="bigquery", name="a.b.c"),
@@ -377,7 +377,7 @@ def test_emit_failed_event(mock_stats_incr,
mock_stats_timer):
eventType=RunState.FAIL,
eventTime=event_time,
run=Run(runId=run_id, facets={}),
- job=Job(namespace="default", name="job", facets={}),
+ job=Job(namespace=_DAG_NAMESPACE, name="job", facets={}),
producer=_PRODUCER,
inputs=[],
outputs=[],
@@ -422,16 +422,16 @@ def
test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta
facets={
"parent": ParentRunFacet(
run={"runId": "parent_run_id"},
- job={"namespace": "default", "name":
"parent_job_name"},
+ job={"namespace": _DAG_NAMESPACE, "name":
"parent_job_name"},
),
"parentRun": ParentRunFacet(
run={"runId": "parent_run_id"},
- job={"namespace": "default", "name":
"parent_job_name"},
+ job={"namespace": _DAG_NAMESPACE, "name":
"parent_job_name"},
),
"externalQuery":
ExternalQueryRunFacet(externalQueryId="123", source="source"),
},
),
- job=Job(namespace="default", name="job", facets={"sql":
SqlJobFacet(query="SELECT 1;")}),
+ job=Job(namespace=_DAG_NAMESPACE, name="job", facets={"sql":
SqlJobFacet(query="SELECT 1;")}),
producer=_PRODUCER,
inputs=[
Dataset(namespace="bigquery", name="a.b.c"),
@@ -485,7 +485,7 @@ def test_emit_dag_started_event(mock_stats_incr,
mock_stats_timer, uuid):
)
},
),
- job=Job(namespace="default", name="dag_id", facets={}),
+ job=Job(namespace=_DAG_NAMESPACE, name="dag_id", facets={}),
producer=_PRODUCER,
inputs=[],
outputs=[],
@@ -527,7 +527,7 @@ def test_emit_dag_complete_event(mock_stats_incr,
mock_stats_timer, uuid):
eventType=RunState.COMPLETE,
eventTime=event_time.isoformat(),
run=Run(runId=random_uuid, facets={}),
- job=Job(namespace="default", name="dag_id", facets={}),
+ job=Job(namespace=_DAG_NAMESPACE, name="dag_id", facets={}),
producer=_PRODUCER,
inputs=[],
outputs=[],
@@ -576,7 +576,7 @@ def test_emit_dag_failed_event(mock_stats_incr,
mock_stats_timer, uuid):
)
},
),
- job=Job(namespace="default", name="dag_id", facets={}),
+ job=Job(namespace=_DAG_NAMESPACE, name="dag_id", facets={}),
producer=_PRODUCER,
inputs=[],
outputs=[],
@@ -627,25 +627,23 @@ def
test_build_dag_run_id_uses_correct_methods_underneath():
def test_build_task_instance_run_id_is_valid_uuid():
- task_id = "task_1"
- execution_date = "2023-01-01"
- try_number = 1
- result = OpenLineageAdapter.build_task_instance_run_id(task_id,
execution_date, try_number)
+ result = OpenLineageAdapter.build_task_instance_run_id("dag_1", "task_1",
"2023-01-01", 1)
assert uuid.UUID(result)
def test_build_task_instance_run_id_different_inputs_gives_different_results():
- result1 = OpenLineageAdapter.build_task_instance_run_id("task1",
"2023-01-01", 1)
- result2 = OpenLineageAdapter.build_task_instance_run_id("task2",
"2023-01-02", 2)
+ result1 = OpenLineageAdapter.build_task_instance_run_id("dag_1", "task1",
"2023-01-01", 1)
+ result2 = OpenLineageAdapter.build_task_instance_run_id("dag_1", "task2",
"2023-01-02", 2)
assert result1 != result2
def test_build_task_instance_run_id_uses_correct_methods_underneath():
+ dag_id = "dag_1"
task_id = "task_1"
execution_date = "2023-01-01"
try_number = 1
expected = str(
- uuid.uuid3(uuid.NAMESPACE_URL,
f"{_DAG_NAMESPACE}.{task_id}.{execution_date}.{try_number}")
+ uuid.uuid3(uuid.NAMESPACE_URL,
f"{_DAG_NAMESPACE}.{dag_id}.{task_id}.{execution_date}.{try_number}")
)
- actual = OpenLineageAdapter.build_task_instance_run_id(task_id,
execution_date, try_number)
+ actual = OpenLineageAdapter.build_task_instance_run_id(dag_id, task_id,
execution_date, try_number)
assert actual == expected