kacpermuda commented on code in PR #38982:
URL: https://github.com/apache/airflow/pull/38982#discussion_r1579417082
##########
airflow/providers/openlineage/utils/utils.py:
##########
@@ -61,11 +61,20 @@ def get_job_name(task: TaskInstance) -> str:
def get_custom_facets(task_instance: TaskInstance | None = None) -> dict[str,
Any]:
+ from airflow.providers.openlineage.extractors.manager import
try_import_from_string
+
custom_facets = {}
# check for -1 comes from SmartSensor compatibility with dynamic task
mapping
# this comes from Airflow code
if hasattr(task_instance, "map_index") and getattr(task_instance,
"map_index") != -1:
custom_facets["airflow_mappedTask"] =
AirflowMappedTaskRunFacet.from_task_instance(task_instance)
+
+ # Append custom run facets by executing the custom_facet_functions.
+ for custom_facet_func in conf.custom_facet_functions():
+ func: type[function] = try_import_from_string(custom_facet_func)
+ facet = func(task_instance) if func else None
+ if facet and isinstance(facet, dict):
+ custom_facets.update(facet)
Review Comment:
I would add a check for duplicates here (check if we are overwriting some
facet already and maybe put some information in logs) and possibly also
disallow some predefined facet names (keys in dict), so that some base facets
are not overwritten by user accidentally. Or maybe we should allow overwriting
and give the user full control? WDYT @mobuchowski @JDarDagran
##########
airflow/providers/openlineage/conf.py:
##########
@@ -65,6 +65,19 @@ def custom_extractors() -> set[str]:
return set(extractor.strip() for extractor in option.split(";") if
extractor.strip())
+@cache
+def custom_facet_functions() -> set[str]:
+ """[openlineage] custom_facet_functions."""
+ option = conf.get(_CONFIG_SECTION, "custom_facet_functions", fallback="")
+ if not option:
+ option = os.getenv("OPENLINEAGE_CUSTOM_FACET_FUNCTIONS", "")
Review Comment:
I don't think the `OPENLINEAGE_CUSTOM_FACET_FUNCTIONS` variable is
necessary, we can stick to Airflow config. We use `OPENLINEAGE_` variables in
some other configs as they are legacy solutions that still need to be
supported. For new features we can omit them.
##########
airflow/providers/openlineage/provider.yaml:
##########
@@ -101,6 +101,15 @@ config:
example: full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
default: ~
version_added: ~
+ custom_facet_functions:
+ description: |
+ Register functions that return custom facets to append to the run
facet. This config takes a
+ string of semicolon delimited full import paths to the functions.
The functions should accept
+ a TaskInstance input argument and should return a dictionary.
Review Comment:
Not sure if we should include that here as it's an implementation detail. If
we change the input parameters it may be easy to forget to change that in the
yaml, but maybe I'm wrong?
> The functions should accept a TaskInstance input argument and return a
dictionary.
##########
airflow/providers/openlineage/provider.yaml:
##########
@@ -101,6 +101,15 @@ config:
example: full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
default: ~
version_added: ~
+ custom_facet_functions:
+ description: |
+ Register functions that return custom facets to append to the run
facet. This config takes a
+ string of semicolon delimited full import paths to the functions.
The functions should accept
+ a TaskInstance input argument and should return a dictionary.
Review Comment:
```suggestion
Register custom facet functions, that return facets to append as
run facets, by passing a string of semicolon separated full import paths.
```
##########
airflow/providers/openlineage/utils/utils.py:
##########
@@ -61,11 +61,20 @@ def get_job_name(task: TaskInstance) -> str:
def get_custom_facets(task_instance: TaskInstance | None = None) -> dict[str,
Any]:
+ from airflow.providers.openlineage.extractors.manager import
try_import_from_string
+
custom_facets = {}
# check for -1 comes from SmartSensor compatibility with dynamic task
mapping
# this comes from Airflow code
if hasattr(task_instance, "map_index") and getattr(task_instance,
"map_index") != -1:
custom_facets["airflow_mappedTask"] =
AirflowMappedTaskRunFacet.from_task_instance(task_instance)
+
+ # Append custom run facets by executing the custom_facet_functions.
+ for custom_facet_func in conf.custom_facet_functions():
+ func: type[function] = try_import_from_string(custom_facet_func)
Review Comment:
`func` can also be None, so we should maybe include that in type hint?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]