anandhimurali commented on code in PR #38982:
URL: https://github.com/apache/airflow/pull/38982#discussion_r1666420076
##########
airflow/providers/openlineage/utils/utils.py:
##########
@@ -78,11 +78,38 @@ def get_job_name(task: TaskInstance) -> str:
def get_custom_facets(task_instance: TaskInstance | None = None) -> dict[str,
Any]:
+ from airflow.providers.openlineage.extractors.manager import
try_import_from_string
+
custom_facets = {}
# check for -1 comes from SmartSensor compatibility with dynamic task
mapping
# this comes from Airflow code
if hasattr(task_instance, "map_index") and getattr(task_instance,
"map_index") != -1:
custom_facets["airflow_mappedTask"] =
AirflowMappedTaskRunFacet.from_task_instance(task_instance)
+
+ # Append custom run facets by executing the custom_run_facets.
Review Comment:
>I believe we need to decide whether custom run facets should receive the
task before or after execution. Users might want to use elements like query_id
generated during execution in their custom facets. While I don't have a strong
preference either way, it's important to note this decision consciously in the
documentation.
Since the
[get_custom_facets](https://github.com/apache/airflow/blob/9d92feda7fae0f014743153ce69656eb339fc420/airflow/providers/openlineage/plugins/listener.py#L166)
is invoked only during the OpenLineage START event generation, only the state
at the start of the TaskInstance is available to the custom facet functions.
However, if the state after task execution needs to be made available to the
functions, it can be only be done during the COMPLETE or other adhoc lineage
events? IMO, the scope of changes will be larger for this and can be taken as a
follow-up PR. Let me know what you think.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]