JDarDagran commented on code in PR #38982: URL: https://github.com/apache/airflow/pull/38982#discussion_r1686210312
########## tests/providers/openlineage/utils/custom_facet_fixture.py: ########## @@ -0,0 +1,87 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +import attrs +from openlineage.client.facet import BaseFacet + +if TYPE_CHECKING: + from airflow.models import TaskInstance + + [email protected](slots=False) +class MyCustomRunFacet(BaseFacet): + """Define a custom run facet.""" + + name: str + jobState: str + uniqueName: str + displayName: str + dagId: str + taskId: str + cluster: str + + +def get_additional_test_facet(task_instance: TaskInstance) -> dict[str, dict] | None: + operator_name = task_instance.task.operator_name if task_instance.task else None + if operator_name == "BashOperator": + return None + job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}" + return { + "additional_run_facet": attrs.asdict( Review Comment: nit: same as in docs comment, no need for that ########## docs/apache-airflow-providers-openlineage/guides/developer.rst: ########## @@ -446,15 +446,100 @@ Conversion from Airflow Table entity to OpenLineage Dataset is made in the follo .. _custom_facets:openlineage: -Custom facets +Custom Facets ============= To learn more about facets in OpenLineage, please refer to `facet documentation <https://openlineage.io/docs/spec/facets/>`_. -Also check out `available Facets <https://github.com/OpenLineage/OpenLineage/blob/main/client/python/openlineage/client/facet.py>`_ +Also check out `available facets <https://github.com/OpenLineage/OpenLineage/blob/main/client/python/openlineage/client/facet.py>`_ The OpenLineage spec might not contain all the facets you need to write your extractor, in which case you will have to make your own `custom facets <https://openlineage.io/docs/spec/facets/custom-facets>`_. More on creating custom facets can be found `here <https://openlineage.io/blog/extending-with-facets/>`_. +Custom Run Facets +================= + +You can inject your own custom facets in the lineage event's run facet using the ``custom_run_facets`` Airflow configuration. + +Steps to be taken, + +1. Write a function that returns the custom facet. You can write as many custom facet functions as needed. +2. Register the functions using the ``custom_run_facets`` Airflow configuration. + +Once done, Airflow OpenLineage listener will automatically execute these functions during the lineage event generation +and append their return values to the run facet in the lineage event. + +Writing a custom facet function +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +- **Input arguments:** The function should accept the ``TaskInstance`` as an input argument. +- **Function body:** Perform the logic needed to generate the custom facet. The custom facet should inherit from the ``BaseFacet`` for the ``_producer`` and ``_schemaURL`` to be automatically added for the facet. +- **Return value:** The custom facet to be added to the lineage event. Return type should be ``dict[str, dict]`` or ``None``. You may choose to return ``None``, if you do not want to add custom facets for certain criteria. + +**Example custom facet function** + +.. code-block:: python + + import attrs + from airflow.models import TaskInstance + from openlineage.client.facet import BaseFacet + + + @attrs.define(slots=False) + class MyCustomRunFacet(BaseFacet): + """Define a custom facet.""" + + name: str + jobState: str + uniqueName: str + displayName: str + dagId: str + taskId: str + cluster: str + + + def get_my_custom_facet(task_instance: TaskInstance) -> dict[str, dict] | None: + operator_name = task_instance.task.operator_name + if operator_name == "BashOperator": + return + job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}" + return { + "additional_run_facet": attrs.asdict( Review Comment: nit: there's no need to translate to dict here, OL client Serde takes care of it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
