kacpermuda commented on code in PR #38982:
URL: https://github.com/apache/airflow/pull/38982#discussion_r1665780045


##########
airflow/providers/openlineage/provider.yaml:
##########
@@ -104,6 +104,13 @@ config:
         example: full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
         default: ~
         version_added: ~
+      custom_run_facets:
+        description: |
+          Register custom run facet functions by passing a string of semicolon 
separated full import paths.
+        type: string
+        example: 
full.path.to.custom_facet_function;full.path.to.another_custom_facet_function
+        default: ''
+        version_added: 1.8.0

Review Comment:
   This will probably be 1.10.0 as 1.9.0 should be released before this gets 
merged.



##########
airflow/providers/openlineage/utils/utils.py:
##########
@@ -78,11 +78,38 @@ def get_job_name(task: TaskInstance) -> str:
 
 
 def get_custom_facets(task_instance: TaskInstance | None = None) -> dict[str, 
Any]:
+    from airflow.providers.openlineage.extractors.manager import 
try_import_from_string
+
     custom_facets = {}
     # check for -1 comes from SmartSensor compatibility with dynamic task 
mapping
     # this comes from Airflow code
     if hasattr(task_instance, "map_index") and getattr(task_instance, 
"map_index") != -1:
         custom_facets["airflow_mappedTask"] = 
AirflowMappedTaskRunFacet.from_task_instance(task_instance)
+
+    # Append custom run facets by executing the custom_run_facets.

Review Comment:
   It's more general comment but currently `get_custom_facets()` function is 
called only once, after task instance has started. 
   
   I believe we need to decide whether custom run facets should receive the 
task before or after execution. Users might want to use elements like 
`query_id` generated during execution in their custom facets. While I don't 
have a strong preference either way, it's important to note this decision 
consciously in the documentation.



##########
airflow/providers/openlineage/utils/utils.py:
##########
@@ -78,11 +78,38 @@ def get_job_name(task: TaskInstance) -> str:
 
 
 def get_custom_facets(task_instance: TaskInstance | None = None) -> dict[str, 
Any]:
+    from airflow.providers.openlineage.extractors.manager import 
try_import_from_string
+
     custom_facets = {}
     # check for -1 comes from SmartSensor compatibility with dynamic task 
mapping
     # this comes from Airflow code
     if hasattr(task_instance, "map_index") and getattr(task_instance, 
"map_index") != -1:
         custom_facets["airflow_mappedTask"] = 
AirflowMappedTaskRunFacet.from_task_instance(task_instance)
+
+    # Append custom run facets by executing the custom_run_facets.
+    for custom_facet_func in conf.custom_run_facets():
+        func: type[function] | None = try_import_from_string(custom_facet_func)
+        if not func:
+            log.warning(
+                "OpenLineage is unable to import custom facet function `%s`; 
will ignore it.",
+                custom_facet_func,
+            )
+            continue
+        facet: dict[str, BaseFacet] = func(task_instance)

Review Comment:
   And one more question / decision to make: should we wrap the function call 
with try/except to avoid crashing the whole event emission or do we let the 
user crash it if they provided the function that breaks? I would vote for 
putting a try/except with some descriptive logging here.



##########
docs/apache-airflow-providers-openlineage/guides/developer.rst:
##########
@@ -446,15 +446,96 @@ Conversion from Airflow Table entity to OpenLineage 
Dataset is made in the follo
 
 .. _custom_facets:openlineage:
 
-Custom facets
+Custom Facets
 =============
 To learn more about facets in OpenLineage, please refer to `facet 
documentation <https://openlineage.io/docs/spec/facets/>`_.
-Also check out `available Facets 
<https://github.com/OpenLineage/OpenLineage/blob/main/client/python/openlineage/client/facet.py>`_
+Also check out `available facets 
<https://github.com/OpenLineage/OpenLineage/blob/main/client/python/openlineage/client/facet.py>`_
 
 The OpenLineage spec might not contain all the facets you need to write your 
extractor,
 in which case you will have to make your own `custom facets 
<https://openlineage.io/docs/spec/facets/custom-facets>`_.
 More on creating custom facets can be found `here 
<https://openlineage.io/blog/extending-with-facets/>`_.
 
+Custom Run Facets
+-----------------
+
+You can inject your own custom facets in the lineage event's run facet using 
the ``custom_run_facets`` Airflow configuration.
+
+Steps to be taken,
+
+1. Write a function that returns the custom facet. You can write as many 
custom facet functions as needed.
+2. Register the functions using the ``custom_run_facets`` Airflow 
configuration.
+
+Once done, Airflow OpenLineage listener will automatically execute these 
functions during the lineage event generation
+and append their return values to the run facet in the lineage event.
+
+Writing a custom facet function
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+- **Input arguments:** The function should accept the ``TaskInstance`` as an 
input argument.
+- **Function body:** Perform the logic needed to generate the custom facet. 
The custom facet should inherit from the ``BaseFacet`` for the ``_producer`` 
and ``_schemaURL`` to be automatically added for the facet.
+- **Return value:** The custom facet to be added to the lineage event. Return 
type should be ``None`` or ``dict``. You may choose to return ``None`` or 
``{}``, if you do not want to add custom facets for certain criteria.
+
+**Example custom facet function**
+
+.. code-block:: python
+
+    import attrs
+    from airflow.models import TaskInstance
+    from openlineage.client.facet import BaseFacet
+
+
+    @attrs.define(slots=False)
+    class MyCustomRunFacet(BaseFacet):
+        """Define a custom facet."""
+
+        name: str
+        jobState: str
+        uniqueName: str
+        displayName: str
+        dagId: str
+        taskId: str
+        cluster: str
+
+
+    def get_my_custom_facet(task_instance: TaskInstance):
+        job_unique_name = 
f"TEST.{task_instance.dag_id}.{task_instance.task_id}"

Review Comment:
   ```suggestion
       def get_my_custom_facet(task_instance: TaskInstance):
           operator_name = task_instance.task.operator_name
           if operator_name == "BashOperator":
               return None
           job_unique_name = 
f"TEST.{task_instance.dag_id}.{task_instance.task_id}"
   ```
   
   Including some simple example here can make it easier for the users to add 
conditional facets.



##########
airflow/providers/openlineage/utils/utils.py:
##########
@@ -78,11 +78,38 @@ def get_job_name(task: TaskInstance) -> str:
 
 
 def get_custom_facets(task_instance: TaskInstance | None = None) -> dict[str, 
Any]:
+    from airflow.providers.openlineage.extractors.manager import 
try_import_from_string
+
     custom_facets = {}
     # check for -1 comes from SmartSensor compatibility with dynamic task 
mapping
     # this comes from Airflow code
     if hasattr(task_instance, "map_index") and getattr(task_instance, 
"map_index") != -1:
         custom_facets["airflow_mappedTask"] = 
AirflowMappedTaskRunFacet.from_task_instance(task_instance)
+
+    # Append custom run facets by executing the custom_run_facets.
+    for custom_facet_func in conf.custom_run_facets():
+        func: type[function] | None = try_import_from_string(custom_facet_func)
+        if not func:
+            log.warning(
+                "OpenLineage is unable to import custom facet function `%s`; 
will ignore it.",
+                custom_facet_func,
+            )
+            continue
+        facet: dict[str, BaseFacet] = func(task_instance)

Review Comment:
   I noticed that in examples here we are using attrs.asdict(), so we are 
providing the facets as dicts, but here we actually expect type[BaseFacet]. 
Could you adjust the typing and examples so that it matches?



##########
docs/apache-airflow-providers-openlineage/guides/developer.rst:
##########
@@ -446,15 +446,96 @@ Conversion from Airflow Table entity to OpenLineage 
Dataset is made in the follo
 
 .. _custom_facets:openlineage:
 
-Custom facets
+Custom Facets
 =============
 To learn more about facets in OpenLineage, please refer to `facet 
documentation <https://openlineage.io/docs/spec/facets/>`_.
-Also check out `available Facets 
<https://github.com/OpenLineage/OpenLineage/blob/main/client/python/openlineage/client/facet.py>`_
+Also check out `available facets 
<https://github.com/OpenLineage/OpenLineage/blob/main/client/python/openlineage/client/facet.py>`_
 
 The OpenLineage spec might not contain all the facets you need to write your 
extractor,
 in which case you will have to make your own `custom facets 
<https://openlineage.io/docs/spec/facets/custom-facets>`_.
 More on creating custom facets can be found `here 
<https://openlineage.io/blog/extending-with-facets/>`_.
 
+Custom Run Facets
+-----------------
+
+You can inject your own custom facets in the lineage event's run facet using 
the ``custom_run_facets`` Airflow configuration.
+
+Steps to be taken,
+
+1. Write a function that returns the custom facet. You can write as many 
custom facet functions as needed.
+2. Register the functions using the ``custom_run_facets`` Airflow 
configuration.
+
+Once done, Airflow OpenLineage listener will automatically execute these 
functions during the lineage event generation
+and append their return values to the run facet in the lineage event.
+
+Writing a custom facet function
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+- **Input arguments:** The function should accept the ``TaskInstance`` as an 
input argument.
+- **Function body:** Perform the logic needed to generate the custom facet. 
The custom facet should inherit from the ``BaseFacet`` for the ``_producer`` 
and ``_schemaURL`` to be automatically added for the facet.
+- **Return value:** The custom facet to be added to the lineage event. Return 
type should be ``None`` or ``dict``. You may choose to return ``None`` or 
``{}``, if you do not want to add custom facets for certain criteria.
+
+**Example custom facet function**
+
+.. code-block:: python
+
+    import attrs
+    from airflow.models import TaskInstance
+    from openlineage.client.facet import BaseFacet
+
+
+    @attrs.define(slots=False)
+    class MyCustomRunFacet(BaseFacet):
+        """Define a custom facet."""
+
+        name: str
+        jobState: str
+        uniqueName: str
+        displayName: str
+        dagId: str
+        taskId: str
+        cluster: str
+
+
+    def get_my_custom_facet(task_instance: TaskInstance):
+        job_unique_name = 
f"TEST.{task_instance.dag_id}.{task_instance.task_id}"

Review Comment:
   If we decide that this function will receive the task after the execution, 
we can also show in the example how to retrieve some attribute from the 
operator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to