kacpermuda commented on code in PR #49237:
URL: https://github.com/apache/airflow/pull/49237#discussion_r2050421329
##########
providers/openlineage/src/airflow/providers/openlineage/utils/utils.py:
##########
@@ -126,6 +127,24 @@ def get_job_name(task: TaskInstance) -> str:
return f"{task.dag_id}.{task.task_id}"
+def get_task_parent_run_id(
Review Comment:
```suggestion
def get_task_parent_run_facet(
```
##########
providers/openlineage/src/airflow/providers/openlineage/utils/utils.py:
##########
@@ -126,6 +127,24 @@ def get_job_name(task: TaskInstance) -> str:
return f"{task.dag_id}.{task.task_id}"
+def get_task_parent_run_id(
+ parent_run_id: str, parent_job_name: str, parent_job_namespace: str =
conf.namespace()
+) -> dict[str, Any]:
+ # Gets parent run id for TASK level events
+ # Those right now will always point to the DAG level run it and name
+ # As we don't handle dags runs from external events yet
Review Comment:
Maybe let's do it as proper docstring? I think we can also be more
descriptive here, that parent information will be used as root parent
information, as for now we do not track DAG parents, so there is no "recursive"
check.
##########
providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py:
##########
@@ -637,7 +665,17 @@ def test_emit_dag_started_event(mock_stats_incr,
mock_stats_timer, generate_stat
nominal_end_time=event_time.isoformat(),
owners=["airflow"],
description=dag.description,
- run_facets={"airflowDagRun": dag_run_facet},
+ run_facets={
+ "parent": parent_run.ParentRunFacet(
+ run=parent_run.Run(runId=random_uuid),
+ job=parent_run.Job(namespace=namespace(),
name="parent_job_name"),
+ root=parent_run.Root(
+ run=parent_run.RootRun(runId=random_uuid),
+ job=parent_run.RootJob(namespace=namespace(),
name="parent_job_name"),
+ ),
+ ),
+ "airflowDagRun": dag_run_facet,
+ },
Review Comment:
If we're testing it for dag_start, let's add it to dag_success and
dag_failed as well. It's not really testing much, just that run facets passed
to the method are included in event, but still worth it probably.
##########
providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py:
##########
@@ -504,11 +502,12 @@ def on_state_change():
adapter_kwargs = {
"run_id": task_uuid,
"job_name": get_job_name(ti),
- "parent_job_name": dagrun.dag_id,
- "parent_run_id": parent_run_id,
"end_time": end_date.isoformat(),
"task": OperatorLineage(),
- "run_facets": get_airflow_debug_facet(),
+ "run_facets": {
+ **get_task_parent_run_id(ti.dag_id, parent_run_id),
Review Comment:
```suggestion
**get_task_parent_run_id(parent_run_id=parent_run_id,
parent_job_name=ti.dag_id),
```
I think the order here was wrong, so it would not work. Maybe some uuid
validation should be added to catch those kind of errors or we should only
allow kwargs?
##########
providers/openlineage/src/airflow/providers/openlineage/utils/utils.py:
##########
@@ -126,6 +127,24 @@ def get_job_name(task: TaskInstance) -> str:
return f"{task.dag_id}.{task.task_id}"
+def get_task_parent_run_id(
Review Comment:
Let's be consistent in naming with other utils functions that return facets.
##########
providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py:
##########
@@ -101,3 +87,44 @@ def lineage_parent_id(task_instance: TaskInstance):
lineage_run_id(task_instance),
)
)
+
+
+def lineage_root_parent_id(task_instance: TaskInstance):
+ return "/".join(
+ (
+ lineage_job_namespace(),
+ lineage_root_job_name(task_instance),
+ lineage_root_run_id(task_instance),
+ )
+ )
+
+
+def lineage_root_job_name(task_instance: TaskInstance):
+ return task_instance.dag_id
+
+
+def lineage_root_run_id(task_instance: TaskInstance):
+ return OpenLineageAdapter.build_dag_run_id(
+ dag_id=task_instance.dag_id,
+ logical_date=_get_logical_date(task_instance),
+ clear_number=task_instance.dag_run.clear_number,
+ )
Review Comment:
How do you think we'll make it work once we add parent run facets to DAGs as
well (f.e. when DAG is triggered by task from other dag)? Can we already adjust
for it somehow to avoid changing the macros later on?
##########
providers/dbt/cloud/src/airflow/providers/dbt/cloud/utils/openlineage.py:
##########
@@ -148,10 +148,19 @@ async def get_artifacts_for_steps(steps, artifacts):
map_index=task_instance.map_index,
)
+ root_parent_run_id = OpenLineageAdapter.build_dag_run_id(
+ dag_id=task_instance.dag_id,
+ logical_date=_get_logical_date(task_instance),
+ clear_number=task_instance.dag_run.clear_number,
+ )
+
parent_job = ParentRunMetadata(
run_id=parent_run_id,
job_name=f"{task_instance.dag_id}.{task_instance.task_id}",
job_namespace=namespace(),
+ root_parent_run_id=root_parent_run_id,
+ root_parent_job_name=task_instance.dag_id,
+ root_parent_job_namespace=namespace(),
Review Comment:
I think we need to bump the OL provider version in decorator for this
function to 2.2.0, otherwise passing root_* args here will cause error as we
are not sure if OL commmon is >= 1.31.0. It should be:
`@require_openlineage_version(provider_min_version="2.2.0")`
##########
providers/openlineage/tests/unit/openlineage/plugins/test_execution.py:
##########
@@ -99,22 +96,15 @@ def setup_job(self, task_name, run_id):
dag = dagbag.dags.get("test_openlineage_execution")
task = dag.get_task(task_name)
- if AIRFLOW_V_3_0_PLUS:
- dagrun_kwargs = {
- "logical_date": DEFAULT_DATE,
- "run_after": DEFAULT_DATE,
- "triggered_by": DagRunTriggeredByType.TEST,
- }
- else:
- dagrun_kwargs = {"execution_date": DEFAULT_DATE}
Review Comment:
Why is this change needed? Seems unrelated. Did something change in tests?
##########
providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py:
##########
@@ -154,13 +164,21 @@ def _get_parent_run_facet(task_instance):
from airflow.providers.openlineage.conf import namespace
parent_run_id = _get_ol_run_id(task_instance)
+ root_parent_run_id = _get_ol_dag_run_id(task_instance)
return parent_run.ParentRunFacet(
run=parent_run.Run(runId=parent_run_id),
job=parent_run.Job(
namespace=namespace(),
name=f"{task_instance.dag_id}.{task_instance.task_id}",
),
+ root=parent_run.Root(
+ run=parent_run.RootRun(runId=root_parent_run_id),
+ job=parent_run.RootJob(
+ name=task_instance.dag_id,
+ namespace=namespace(),
+ ),
+ ),
Review Comment:
Same as in dbt provider, I think we need to bump the OL provider requirement
in decorator of the main function.
`@require_openlineage_version(provider_min_version="2.2.0")`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]