kacpermuda commented on code in PR #37620:
URL: https://github.com/apache/airflow/pull/37620#discussion_r1506029657
##########
docs/apache-airflow-providers-openlineage/guides/developer.rst:
##########
@@ -104,26 +166,330 @@ Here's example of properly implemented
``get_openlineage_facets_on_complete`` me
],
)
+For more examples of implemented OpenLineage methods, check out the source
code of :ref:`supported_classes:openlineage`.
-How to add tests to OpenLineage integration?
-============================================
+.. _custom_extractors:openlineage:
-Unit testing OpenLineage integration in operators is very similar to testing
operators itself.
-Objective of those tests is making sure the ``get_openlineage_*`` methods
return proper ``OperatorLineage``
-data structure with relevant fields filled. It's recommended to mock any
external calls.
-Authors of tests need to remember the condition of calling different OL
methods is different.
-``get_openlineage_facets_on_start`` is called before ``execute``, and as such,
must not depend on values
-that are set there.
+Custom Extractors
+=================
-System testing OpenLineage integration relies on the existing system test
framework.
-There is special ``VariableTransport`` that gathers OpenLineage events in
Airflow database,
-and ``OpenLineageTestOperator`` that compares those events to expected ones.
Objective of author
-of OpenLineage system test is to provide expected dictionary of event keys and
events to ``OpenLineageTestOperator``.
+This approach is recommended when dealing with Operators that you can not
modify (f.e. third party providers), but still want the lineage to be extracted
from them.
+If you want to extract lineage from your own Operators, you may prefer
directly implementing OpenLineage methods as described in
:ref:`openlineage_methods:openlineage`.
-Event keys identify event send from particular operator and method: they have
structure ``<dag_id>.<task_id>.event.<event_type>``;
-it's always possible to identify particular event send from particular task
this way.
+This approach works by detecting which Airflow Operators your DAG is using,
and extracting lineage data from them using corresponding Extractors class.
-The provided event structure does not have to contain all the fields that are
in the resulting event.
-Only the fields provided by test author are compared; this allows to check
only for fields particular
-test cares about. It also allows to skip fields that are (semi) randomly
generated, like ``runId`` or ``eventTime``,
-or just always the same in context of OpenLineage in Airflow, like
``producer``.
+Interface
+^^^^^^^^^
+
+Custom Extractors have to derive from :class:`BaseExtractor
<airflow.providers.openlineage.extractors.base.BaseExtractor>`
+and implement at least two methods: ``_execute_extraction`` and
``get_operator_classnames``.
+
+BaseOperator defines two methods: ``extract`` and ``extract_on_complete``,
that are called and used to provide actual lineage data.
+The difference is that ``extract`` is called before Operator's ``execute``
method, while ``extract_on_complete`` is called after.
+By default, ``extract`` calls ``_execute_extraction`` method implemented in
custom Extractor, and ``extract_on_complete``
+calls the ``extract`` method. If you want to provide some additional
information available after the task execution, you can
+override ``extract_on_complete`` method. This can be used to extract any
additional information that the Operator
+sets on it's own properties. Good example is ``SnowflakeOperator`` that sets
``query_ids`` after execution.
+
+The ``get_operator_classnames`` is a classmethod that is used to provide list
of Operators that your Extractor can get lineage from.
+
+For example:
+
+.. code-block:: python
+
+ @classmethod
+ def get_operator_classnames(cls) -> List[str]:
+ return ['PostgresOperator']
+
+If the name of the Operator matches one of the names on the list, the
Extractor will be instantiated - with Operator
+provided in the Extractor's ``self.operator`` property - and both ``extract``
and ``extract_on_complete`` methods will be called.
+
+Both methods return ``OperatorLineage`` structure:
+
+.. code-block:: python
+
+ @define
+ class OperatorLineage:
+ """Structure returned from lineage extraction."""
+
+ inputs: list[Dataset] = Factory(list)
+ outputs: list[Dataset] = Factory(list)
+ run_facets: dict[str, BaseFacet] = Factory(dict)
+ job_facets: dict[str, BaseFacet] = Factory(dict)
+
+
+Inputs and outputs are lists of plain OpenLineage datasets
(`openlineage.client.run.Dataset`).
+
+``run_facets`` and ``job_facets`` are dictionaries of optional RunFacets and
JobFacets that would be attached to the job - for example,
+you might want to attach ``SqlJobFacet`` if your Operator is executing SQL.
+
+To learn more about facets in OpenLineage see :ref:`custom_facets:openlineage`.
+
+Registering Custom Extractor
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+OpenLineage integration does not know that you've provided an Extractor unless
you'll register it.
+
+It can be done by using ``extractors`` option in Airflow configuration.
+
+.. code-block:: ini
+
+ [openlineage]
+ transport = '{"type": "http", "url": "http://example.com:5000"}'
+ extractors = full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
+
+``AIRFLOW__OPENLINEAGE__EXTRACTORS`` environment variable is an equivalent.
+
+.. code-block:: ini
+
+
AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'
+
+Optionally, you can separate them with whitespace. It's useful if you're
providing them as part of some YAML file.
+
+.. code-block:: ini
+
+ AIRFLOW__OPENLINEAGE__EXTRACTORS: >-
+ full.path.to.FirstExtractor;
+ full.path.to.SecondExtractor
+
+
+Remember to make sure that the path is importable for scheduler and worker.
+
+Debugging Custom Extractor
+^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+There are two common problems associated with custom Extractors.
+
+First, is wrong path provided to ``extractors`` option in Airflow
configuration. The path needs to be exactly the same as one you'd use from your
code.
+If the path is wrong or non-importable from worker, plugin will fail to load
the Extractors and proper OpenLineage events for that Operator won't be emitted.
+
+Second one, and maybe more insidious, are imports from Airflow. Due to the
fact that OpenLineage code gets instantiated when Airflow worker itself starts,
+any import from Airflow can be unnoticeably cyclical. This causes OpenLineage
extraction to fail.
+
+To avoid this issue, import from Airflow only locally - in
``_execute_extraction`` or ``extract_on_complete`` methods.
+If you need imports for type checking, guard them behind typing.TYPE_CHECKING.
+
+
+Testing Custom Extractor
+^^^^^^^^^^^^^^^^^^^^^^^^
+As all code, custom Extractors should be tested. This section will provide
some information about the most important
+data structures to write tests for and some notes on troubleshooting. We
assume prior knowledge of writing custom Extractors.
+To learn more about how Operators and Extractors work together under the hood,
check out :ref:`custom_extractors:openlineage`.
+
+When testing an Extractor, we want to firstly verify if ``OperatorLineage``
object is being created,
+specifically verifying that the object is being built with the correct input
and output datasets and relevant facets.
+This is done in OpenLineage via pytest, with appropriate mocking and patching
for connections and objects.
+Check out `example tests
<https://github.com/apache/airflow/blob/main/tests/providers/openlineage/extractors/test_base.py>`_.
+
+Testing each facet is also important, as data or graphs in the UI can render
incorrectly if the facets are wrong.
+For example, if the facet name is created incorrectly in the Extractor, then
the Operator's task will not show up in the lineage graph,
+creating a gap in pipeline observability.
+
+Even with unit tests, an Extractor may still not be operating as expected.
+The easiest way to tell if data isn't coming through correctly is if the UI
elements are not showing up correctly in the Lineage tab.
+
+See :ref:`local_troubleshooting:openlineage` for details on how to
troubleshoot OpenLineage locally.
+
+Example
+^^^^^^^
+
+This is an example of a simple Extractor for an Operator that executes export
Query in BigQuery and saves the result to S3 file.
+Some information is known before Operator's ``execute`` method is called, and
we can already extract some lineage in ``_execute_extraction`` method.
+After Operator's ``execute`` method is called, in ``extract_on_complete``, we
can simply attach some additional Facets
+f.e. with Bigquery Job ID to what we've prepared earlier. This way, we get all
possible information from the Operator.
+
+Please note that this is just an example. There are some OpenLineage built-in
features that can facilitate different processes,
+like extracting column level lineage and inputs/outputs from SQL query with
SQL parser.
+
+.. code-block:: python
+
+ from openlineage.client.facet import BaseFacet, ExternalQueryRunFacet,
SqlJobFacet
+ from openlineage.client.run import Dataset
+
+ from airflow.models.baseoperator import BaseOperator
+ from airflow.providers.openlineage.extractors.base import BaseExtractor
+
+
+ class ExampleOperator(BaseOperator):
+ def __init__(self, query, bq_table_reference, s3_path) -> None:
+ self.bq_table_reference = bq_table_reference
+ self.s3_path = s3_path
+ self.s3_file_name = s3_file_name
+ self.query = query
+ self._job_id = None
+
+ def execute(self, context) -> Any:
+ self._job_id = run_query(query=self.query)
+
+
+ class ExampleExtractor(BaseExtractor):
+ @classmethod
+ def get_operator_classnames(cls):
+ return ["ExampleOperator"]
+
+ def _execute_extraction(self) -> OperatorLineage:
+ """Define what we know before Operator's extract is called."""
+ return OperatorLineage(
+ inputs=[Dataset(namespace="bigquery",
name=self.bq_table_reference)],
+ outputs=[Dataset(namespace=self.s3_path,
name=self.s3_file_name)],
+ job_facets={
+ "sql": SqlJobFacet(
+ query="EXPORT INTO ... OPTIONS(FORMAT=csv, SEP=';'
...) AS SELECT * FROM ... "
+ )
+ },
+ )
+
+ def extract_on_complete(self) -> OperatorLineage:
+ """Add what we received after Operator's extract call."""
+ lineage_metadata = self.extract()
+ lineage_metadata.run_facets = {
+ "parent": ExternalQueryRunFacet(externalQueryId=self._job_id,
source="bigquery")
+ }
+ return lineage_metadata
+
+For more examples of OpenLineage Extractors, check out the source code of
+`BashExtractor
<https://github.com/apache/airflow/blob/main/airflow/providers/openlineage/extractors/bash.py>`_
or
+`PythonExtractor
<https://github.com/apache/airflow/blob/main/airflow/providers/openlineage/extractors/python.py>`_.
+
+.. _inlets_outlets:openlineage:
+
+Manually annotated lineage
+==========================
+
+This approach is rarely recommended, only in very specific cases, when it's
impossible to extract some lineage information from the Operator itself.
+If you want to extract lineage from your own Operators, you may prefer
directly implementing OpenLineage methods as described in
:ref:`openlineage_methods:openlineage`.
+When dealing with Operators that you can not modify (f.e. third party
providers), but still want the lineage to be extracted from them, see
:ref:`custom_extractors:openlineage`.
+
+Airflow allows Operators to track lineage by specifying the input and outputs
of the Operators via
+`inlets and outlets
<https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/lineage.html#lineage>`_.
+OpenLineage will, by default, use inlets and outlets as input/output datasets
if it cannot find any successful extraction from the OpenLineage methods or the
Extractors.
+
+.. important::
+
+ Airflow supports inlets and outlets to be either a Table, Column, File or
User entity. However, currently OpenLineage only extracts lineage via Table
entity
Review Comment:
#37744
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]