This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8acd7c031b0 doc: Refresh OpenLineage provider docs (#60462)
8acd7c031b0 is described below
commit 8acd7c031b0b2d1dfad552cacd442815a17d660e
Author: Kacper Muda <[email protected]>
AuthorDate: Mon Jan 19 11:14:22 2026 +0100
doc: Refresh OpenLineage provider docs (#60462)
---
.../sphinx_exts/templates/openlineage.rst.jinja2 | 35 +-
docs/spelling_wordlist.txt | 1 +
providers/openlineage/docs/configurations-ref.rst | 263 ++++++++++
providers/openlineage/docs/guides/developer.rst | 249 +++++----
providers/openlineage/docs/guides/structure.rst | 105 +++-
providers/openlineage/docs/guides/user.rst | 561 ---------------------
providers/openlineage/docs/index.rst | 10 +-
providers/openlineage/docs/macros.rst | 401 ++++++++++++++-
providers/openlineage/docs/spark.rst | 280 ++++++++++
providers/openlineage/docs/supported_classes.rst | 22 +-
providers/openlineage/docs/troubleshooting.rst | 165 ++++++
providers/openlineage/provider.yaml | 148 +++---
.../providers/openlineage/get_provider_info.py | 112 ++--
13 files changed, 1472 insertions(+), 880 deletions(-)
diff --git a/devel-common/src/sphinx_exts/templates/openlineage.rst.jinja2
b/devel-common/src/sphinx_exts/templates/openlineage.rst.jinja2
index 27455bcc275..aedae7f6e38 100644
--- a/devel-common/src/sphinx_exts/templates/openlineage.rst.jinja2
+++ b/devel-common/src/sphinx_exts/templates/openlineage.rst.jinja2
@@ -28,40 +28,7 @@ below that support OpenLineage.
Spark operators
===============
-The OpenLineage integration can automatically inject information into Spark
application properties when its being submitted from Airflow.
-The following is a list of supported operators along with the corresponding
information that can be injected.
-See :ref:`automatic injection of parent job information
<options:spark_inject_parent_job_info>` for more details.
-
-
-apache-airflow-providers-apache-livy
-"""""""""""""""""""""""""""""""""""""
-
-- :class:`~airflow.providers.apache.livy.operators.livy.LivyOperator`
- - Parent Job Information
- - Transport Information (only HTTP transport is supported for now (with
api_key auth, if any))
-
-
-apache-airflow-providers-apache-spark
-"""""""""""""""""""""""""""""""""""""
-
--
:class:`~airflow.providers.apache.spark.operators.spark_submit.SparkSubmitOperator`
- - Parent Job Information
- - Transport Information (only HTTP transport is supported for now (with
api_key auth, if any))
-
-
-apache-airflow-providers-google
-"""""""""""""""""""""""""""""""
-
--
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateBatchOperator`
- - Parent Job Information
- - Transport Information (only HTTP transport is supported for now (with
api_key auth, if any))
--
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator`
- - Parent Job Information
- - Transport Information (only HTTP transport is supported for now (with
api_key auth, if any))
--
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`
- - Parent Job Information
- - Transport Information (only HTTP transport is supported for now (with
api_key auth, if any))
-
+See :ref:`spark:openlineage`.
.. _sql-operators:
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index e5e7ecee629..2ff9ecdb6aa 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -613,6 +613,7 @@ emr
EmrAddSteps
EmrCreateJobFlow
enableAutoScale
+enablement
encodable
encryptor
enqueue
diff --git a/providers/openlineage/docs/configurations-ref.rst
b/providers/openlineage/docs/configurations-ref.rst
index 5a6c33d24bb..258c9761d56 100644
--- a/providers/openlineage/docs/configurations-ref.rst
+++ b/providers/openlineage/docs/configurations-ref.rst
@@ -20,3 +20,266 @@
.. include::
/../../../devel-common/src/sphinx_exts/includes/providers-configurations-ref.rst
.. include::
/../../../devel-common/src/sphinx_exts/includes/sections-and-options.rst
+
+
+Highlighted configurations
+===========================
+
+.. _configuration_transport:openlineage:
+
+Transport setup
+----------------
+
+At minimum, one thing that needs to be set up for OpenLineage to function is
``Transport`` - where do you wish for
+your events to end up - for example `Marquez <https://marquezproject.ai/>`_.
+
+Transport as JSON string
+^^^^^^^^^^^^^^^^^^^^^^^^
+The ``transport`` option in OpenLineage section of Airflow configuration is
used for that purpose.
+
+.. code-block:: ini
+
+ [openlineage]
+ transport = {"type": "http", "url": "http://example.com:5000", "endpoint":
"api/v1/lineage"}
+
+``AIRFLOW__OPENLINEAGE__TRANSPORT`` environment variable is an equivalent.
+
+.. code-block:: ini
+
+ AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url":
"http://example.com:5000", "endpoint": "api/v1/lineage"}'
+
+
+If you want to look at OpenLineage events without sending them anywhere, you
can set up ``ConsoleTransport`` - the events will end up in task logs.
+
+.. code-block:: ini
+
+ [openlineage]
+ transport = {"type": "console"}
+
+.. note::
+ For full list of built-in transport types, specific transport's options or
instructions on how to implement your custom transport, refer to
+ `Python client documentation
<https://openlineage.io/docs/client/python/configuration#transports>`_.
+
+Transport as config file
+^^^^^^^^^^^^^^^^^^^^^^^^
+You can also configure OpenLineage ``Transport`` using a YAML file (f.e.
``openlineage.yml``).
+Provide the path to the YAML file as ``config_path`` option in Airflow
configuration.
+
+.. code-block:: ini
+
+ [openlineage]
+ config_path = '/path/to/openlineage.yml'
+
+``AIRFLOW__OPENLINEAGE__CONFIG_PATH`` environment variable is an equivalent.
+
+.. code-block:: ini
+
+ AIRFLOW__OPENLINEAGE__CONFIG_PATH='/path/to/openlineage.yml'
+
+Example content of config YAML file:
+
+.. code-block:: ini
+
+ transport:
+ type: http
+ url: https://backend:5000
+ endpoint: events/receive
+ auth:
+ type: api_key
+ apiKey: f048521b-dfe8-47cd-9c65-0cb07d57591e
+
+.. note::
+
+ Detailed description, together with example config files, can be found `in
Python client documentation
<https://openlineage.io/docs/client/python/configuration#transports>`_.
+
+
+Configuration precedence
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Primary, and recommended method of configuring OpenLineage Airflow Provider is
Airflow configuration.
+As there are multiple possible ways of configuring OpenLineage, it's important
to keep in mind the precedence of different configurations.
+OpenLineage Airflow Provider looks for the configuration in the following
order:
+
+1. Check ``config_path`` in ``airflow.cfg`` under ``openlineage`` section (or
AIRFLOW__OPENLINEAGE__CONFIG_PATH environment variable)
+2. Check ``transport`` in ``airflow.cfg`` under ``openlineage`` section (or
AIRFLOW__OPENLINEAGE__TRANSPORT environment variable)
+3. If all the above options are missing, the OpenLineage Python client used
underneath looks for configuration in the order described in `this
<https://openlineage.io/docs/client/python/configuration>`_ documentation.
Please note that **using Airflow configuration is encouraged** and is the only
future proof solution.
+
+
+.. _configuration_selective_enable:openlineage:
+
+Enabling OpenLineage on Dag/task level
+---------------------------------------
+
+One can selectively enable OpenLineage for specific Dags and tasks by using
the ``selective_enable`` policy.
+To enable this policy, set the ``selective_enable`` option to True in the
[openlineage] section of your Airflow configuration file:
+
+.. code-block:: ini
+
+ [openlineage]
+ selective_enable = True
+
+``AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE`` environment variable is an
equivalent.
+
+.. code-block:: ini
+
+ AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE=true
+
+
+While ``selective_enable`` enables selective control, the ``disabled`` option
still has precedence.
+If you set ``disabled`` to True in the configuration, OpenLineage will be
disabled for all Dags and tasks regardless of the ``selective_enable`` setting.
+
+Once the ``selective_enable`` policy is enabled, you can choose to enable
OpenLineage
+for individual Dags and tasks using the ``enable_lineage`` and
``disable_lineage`` functions.
+
+1. Enabling Lineage on a Dag:
+
+.. code-block:: python
+
+ from airflow.providers.openlineage.utils.selective_enable import
disable_lineage, enable_lineage
+
+ with enable_lineage(Dag(...)):
+ # Tasks within this Dag will have lineage tracking enabled
+ MyOperator(...)
+
+ AnotherOperator(...)
+
+2. Enabling Lineage on a Task:
+
+While enabling lineage on a Dag implicitly enables it for all tasks within
that Dag, you can still selectively disable it for specific tasks:
+
+.. code-block:: python
+
+ from airflow.providers.openlineage.utils.selective_enable import
disable_lineage, enable_lineage
+
+ with DAG(...) as dag:
+ t1 = MyOperator(...)
+ t2 = AnotherOperator(...)
+
+ # Enable lineage for the entire Dag
+ enable_lineage(dag)
+
+ # Disable lineage for task t1
+ disable_lineage(t1)
+
+Enabling lineage on the Dag level automatically enables it for all tasks
within that Dag unless explicitly disabled per task.
+
+Enabling lineage on the task level implicitly enables lineage on its Dag.
+This is because each emitting task sends a `ParentRunFacet
<https://openlineage.io/docs/spec/facets/run-facets/parent_run>`_,
+which requires the Dag-level lineage to be enabled in some OpenLineage backend
systems.
+Disabling Dag-level lineage while enabling task-level lineage might cause
errors or inconsistencies.
+
+
+.. _configuration_custom_facets:openlineage:
+
+Custom Facets
+--------------
+To learn more about facets in OpenLineage, please refer to `facet
documentation <https://openlineage.io/docs/spec/facets/>`_.
+
+The OpenLineage spec might not contain all the facets you need to write your
extractor,
+in which case you will have to make your own `custom facets
<https://openlineage.io/docs/spec/facets/custom-facets>`_.
+
+You can also inject your own custom facets in the lineage event's run facet
using the ``custom_run_facets`` Airflow configuration.
+
+Steps to be taken,
+
+1. Write a function that returns the custom facets. You can write as many
custom facet functions as needed.
+2. Register the functions using the ``custom_run_facets`` Airflow
configuration.
+
+Airflow OpenLineage listener will automatically execute these functions during
the lineage event generation and append their return values to the run facet in
the lineage event.
+
+Writing a custom facet function
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+- **Input arguments:** The function should accept two input arguments:
``TaskInstance`` and ``TaskInstanceState``.
+- **Function body:** Perform the logic needed to generate the custom facets.
The custom facets must inherit from the ``RunFacet`` for the ``_producer`` and
``_schemaURL`` to be automatically added for the facet.
+- **Return value:** The custom facets to be added to the lineage event. Return
type should be ``dict[str, RunFacet]`` or ``None``. You may choose to return
``None``, if you do not want to add custom facets for certain criteria.
+
+**Example custom facet function**
+
+.. code-block:: python
+
+ import attrs
+ from airflow.models.taskinstance import TaskInstance, TaskInstanceState
+ from airflow.providers.common.compat.openlineage.facet import RunFacet
+
+
+ @attrs.define
+ class MyCustomRunFacet(RunFacet):
+ """Define a custom facet."""
+
+ name: str
+ jobState: str
+ uniqueName: str
+ displayName: str
+ dagId: str
+ taskId: str
+ cluster: str
+ custom_metadata: dict
+
+
+ def get_my_custom_facet(
+ task_instance: TaskInstance, ti_state: TaskInstanceState
+ ) -> dict[str, RunFacet] | None:
+ operator_name = task_instance.task.operator_name
+ custom_metadata = {}
+ if operator_name == "BashOperator":
+ return None
+ if ti_state == TaskInstanceState.FAILED:
+ custom_metadata["custom_key_failed"] = "custom_value"
+ job_unique_name =
f"TEST.{task_instance.dag_id}.{task_instance.task_id}"
+ return {
+ "additional_run_facet": MyCustomRunFacet(
+ name="test-lineage-namespace",
+ jobState=task_instance.state,
+ uniqueName=job_unique_name,
+ displayName=f"{task_instance.dag_id}.{task_instance.task_id}",
+ dagId=task_instance.dag_id,
+ taskId=task_instance.task_id,
+ cluster="TEST",
+ custom_metadata=custom_metadata,
+ )
+ }
+
+Register the custom facet functions
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Use the ``custom_run_facets`` Airflow configuration to register the custom run
facet functions by passing
+a string of semicolon separated full import path to the functions.
+
+.. code-block:: ini
+
+ [openlineage]
+ transport = {"type": "http", "url": "http://example.com:5000", "endpoint":
"api/v1/lineage"}
+ custom_run_facets =
full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function
+
+``AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS`` environment variable is an
equivalent.
+
+.. code-block:: ini
+
+
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'
+
+.. note::
+
+ - The custom facet functions are executed both at the START and
COMPLETE/FAIL of the TaskInstance and added to the corresponding OpenLineage
event.
+ - When creating conditions on TaskInstance state, you should use second
argument provided (``TaskInstanceState``) that will contain the state the task
should be in. This may vary from ti.current_state() as the OpenLineage listener
may get called before the TaskInstance's state is updated in Airflow database.
+ - When path to a single function is registered more than once, it will
still be executed only once.
+ - When duplicate custom facet keys are returned by multiple functions
registered, the result of random function result will be added to the lineage
event. Please avoid using duplicate facet keys as it can produce unexpected
behaviour.
+
+
+.. _configuration_backwards_compatibility:openlineage:
+
+Backwards compatibility
+------------------------
+
+.. warning::
+
+ Below variables **should not** be used and can be removed in the future.
Consider using Airflow configuration (described above) for a future proof
solution.
+
+For backwards compatibility with ``openlineage-airflow`` package, some
environment variables are still available:
+
+- ``OPENLINEAGE_DISABLED`` is an equivalent of
``AIRFLOW__OPENLINEAGE__DISABLED``.
+- ``OPENLINEAGE_CONFIG`` is an equivalent of
``AIRFLOW__OPENLINEAGE__CONFIG_PATH``.
+- ``OPENLINEAGE_NAMESPACE`` is an equivalent of
``AIRFLOW__OPENLINEAGE__NAMESPACE``.
+- ``OPENLINEAGE_EXTRACTORS`` is an equivalent of setting
``AIRFLOW__OPENLINEAGE__EXTRACTORS``.
+- ``OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE`` is an equivalent of
``AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE``.
+- ``OPENLINEAGE_URL`` can be used to set up simple http transport. This method
has some limitations and may require using other environment variables to
achieve desired output. See `docs
<https://openlineage.io/docs/client/python/configuration#transports>`_.
diff --git a/providers/openlineage/docs/guides/developer.rst
b/providers/openlineage/docs/guides/developer.rst
index 02068055c96..c37bd8c366a 100644
--- a/providers/openlineage/docs/guides/developer.rst
+++ b/providers/openlineage/docs/guides/developer.rst
@@ -232,7 +232,7 @@ Inputs and outputs are lists of plain OpenLineage datasets
(`openlineage.client.
``run_facets`` and ``job_facets`` are dictionaries of optional RunFacets and
JobFacets that would be attached to the job - for example,
you might want to attach ``SqlJobFacet`` if your Operator is executing SQL.
-To learn more about facets in OpenLineage see :ref:`custom_facets:openlineage`.
+To learn more about facets in OpenLineage see
:ref:`configuration_custom_facets:openlineage`.
Registering Custom Extractor
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
@@ -373,172 +373,163 @@ For more examples of OpenLineage Extractors, check out
the source code of
`BashExtractor
<https://github.com/apache/airflow/blob/main/providers/openlineage/src/airflow/providers/openlineage/extractors/bash.py>`_
or
`PythonExtractor
<https://github.com/apache/airflow/blob/main/providers/openlineage/src/airflow/providers/openlineage/extractors/python.py>`_.
-.. _custom_facets:openlineage:
-Custom Facets
-=============
-To learn more about facets in OpenLineage, please refer to `facet
documentation <https://openlineage.io/docs/spec/facets/>`_.
-Also check out `available facets
<https://github.com/OpenLineage/OpenLineage/blob/main/client/python/src/openlineage/client/facet.py>`_
-and a blog post about `extending with facets
<https://openlineage.io/blog/extending-with-facets/>`_.
+.. _inlets_outlets:openlineage:
-The OpenLineage spec might not contain all the facets you need to write your
extractor,
-in which case you will have to make your own `custom facets
<https://openlineage.io/docs/spec/facets/custom-facets>`_.
+Manually annotated lineage
+==========================
-You can also inject your own custom facets in the lineage event's run facet
using the ``custom_run_facets`` Airflow configuration.
-
-Steps to be taken,
-
-1. Write a function that returns the custom facets. You can write as many
custom facet functions as needed.
-2. Register the functions using the ``custom_run_facets`` Airflow
configuration.
+This approach is rarely recommended, only in very specific cases, when it's
impossible to extract some lineage information from the Operator itself.
+If you want to extract lineage from your own Operators, you may prefer
directly implementing OpenLineage methods as described in
:ref:`openlineage_methods:openlineage`.
+When dealing with Operators that you can not modify (f.e. third party
providers), but still want the lineage to be extracted from them, see
:ref:`custom_extractors:openlineage`.
-Airflow OpenLineage listener will automatically execute these functions during
the lineage event generation and append their return values to the run facet in
the lineage event.
+Airflow allows Operators to track lineage by specifying the input and outputs
of the Operators via
+`inlets and outlets
<https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/assets.html>`_.
+By default, OpenLineage uses inlets and outlets as input and output datasets
when it cannot find any successfully
+extracted datasets from OpenLineage methods or Extractors. When Airflow Assets
are used as inlets and outlets,
+OpenLineage attempts to convert them into OpenLineage Datasets and includes
them as input and output datasets in
+the resulting event. If this conversion is not possible, the inlets and
outlets information is still available in the
+AirflowRunFacet, under task.inlets and task.outlets. When OpenLineage Datasets
are used directly as inlets and outlets,
+no conversion is required. However, this usage is specific to OpenLineage
only: Airflow ignores OpenLineage Datasets
+provided in inlets and outlets, and they are not treated as Airflow Assets.
This mechanism is supported solely for
+OpenLineage's purposes and does not replace or affect Airflow Assets.
-Writing a custom facet function
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-- **Input arguments:** The function should accept two input arguments:
``TaskInstance`` and ``TaskInstanceState``.
-- **Function body:** Perform the logic needed to generate the custom facets.
The custom facets must inherit from the ``RunFacet`` for the ``_producer`` and
``_schemaURL`` to be automatically added for the facet.
-- **Return value:** The custom facets to be added to the lineage event. Return
type should be ``dict[str, RunFacet]`` or ``None``. You may choose to return
``None``, if you do not want to add custom facets for certain criteria.
+Example
+^^^^^^^
-**Example custom facet function**
+An Operator inside the Airflow DAG can be annotated with inlets and outlets
like in the below example:
.. code-block:: python
- import attrs
- from airflow.models.taskinstance import TaskInstance, TaskInstanceState
- from airflow.providers.common.compat.openlineage.facet import RunFacet
-
-
- @attrs.define
- class MyCustomRunFacet(RunFacet):
- """Define a custom facet."""
-
- name: str
- jobState: str
- uniqueName: str
- displayName: str
- dagId: str
- taskId: str
- cluster: str
- custom_metadata: dict
-
-
- def get_my_custom_facet(
- task_instance: TaskInstance, ti_state: TaskInstanceState
- ) -> dict[str, RunFacet] | None:
- operator_name = task_instance.task.operator_name
- custom_metadata = {}
- if operator_name == "BashOperator":
- return None
- if ti_state == TaskInstanceState.FAILED:
- custom_metadata["custom_key_failed"] = "custom_value"
- job_unique_name =
f"TEST.{task_instance.dag_id}.{task_instance.task_id}"
- return {
- "additional_run_facet": MyCustomRunFacet(
- name="test-lineage-namespace",
- jobState=task_instance.state,
- uniqueName=job_unique_name,
- displayName=f"{task_instance.dag_id}.{task_instance.task_id}",
- dagId=task_instance.dag_id,
- taskId=task_instance.task_id,
- cluster="TEST",
- custom_metadata=custom_metadata,
- )
- }
+ """Example DAG demonstrating the usage of the extraction via Inlets and
Outlets."""
-Register the custom facet functions
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+ import pendulum
-Use the ``custom_run_facets`` Airflow configuration to register the custom run
facet functions by passing
-a string of semicolon separated full import path to the functions.
+ from airflow import DAG
+ from openlineage.client.event_v2 import Dataset
-.. code-block:: ini
-
- [openlineage]
- transport = {"type": "http", "url": "http://example.com:5000", "endpoint":
"api/v1/lineage"}
- custom_run_facets =
full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function
-
-``AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS`` environment variable is an
equivalent.
-
-.. code-block:: ini
+ t1 = Dataset(namespace="postgres:my-host.com:1234", name="db.sch.t1")
+ t2 = Dataset(namespace="mysql:another-host.com:5678", name="db.sch.t2")
+ f1 = File(url="s3://bucket/dir/file1")
-
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'
+ with DAG(
+ dag_id="example_operator",
+ schedule="@once",
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+ ) as dag:
+ task1 = BashOperator(
+ task_id="task_1_with_inlet_outlet",
+ bash_command="exit 0;",
+ inlets=[t1, t2],
+ outlets=[f1],
+ )
-.. note::
- - The custom facet functions are executed both at the START and
COMPLETE/FAIL of the TaskInstance and added to the corresponding OpenLineage
event.
- - When creating conditions on TaskInstance state, you should use second
argument provided (``TaskInstanceState``) that will contain the state the task
should be in. This may vary from ti.current_state() as the OpenLineage listener
may get called before the TaskInstance's state is updated in Airflow database.
- - When path to a single function is registered more than once, it will
still be executed only once.
- - When duplicate custom facet keys are returned by multiple functions
registered, the result of random function result will be added to the lineage
event. Please avoid using duplicate facet keys as it can produce unexpected
behaviour.
+.. _extraction_helpers:openlineage:
-.. _job_hierarchy:openlineage:
+Helper functions
+=================
-Job Hierarchy
-=============
+Some providers expose helper functions that simplify OpenLineage event
emission for SQL queries executed within custom operators.
+These functions are particularly useful when executing multiple queries in a
single task, as they allow you to treat each SQL query
+as a separate child job of the Airflow task, creating a more granular lineage
graph.
-Apache Airflow features an inherent job hierarchy: Dags, large and
independently schedulable units, comprise smaller, executable tasks.
+The helper functions automatically:
-OpenLineage reflects this structure in its Job Hierarchy model.
+- Create START and COMPLETE/FAIL OpenLineage events for each query
+- Link child query jobs to the parent Airflow task using ParentRunFacet
+- Optionally retrieve additional metadata (execution times, query text, error
messages) from the database
+- Handle event serialization and emission to the OpenLineage backend
-- Upon Dag scheduling, a START event is emitted.
-- Subsequently, following Airflow's task order, each task triggers:
+Currently available helper functions:
- - START events at TaskInstance start.
- - COMPLETE/FAILED events upon completion.
+- ``emit_openlineage_events_for_snowflake_queries`` - For Snowflake queries
+- ``emit_openlineage_events_for_databricks_queries`` - For Databricks SQL
queries
-- Finally, upon Dag termination, a completion event (COMPLETE or FAILED) is
emitted.
-TaskInstance events' ParentRunFacet references the originating Dag run.
+Example
+^^^^^^^
-.. _troubleshooting:openlineage:
+When using Airflow hooks (e.g., ``SnowflakeHook``, ``DatabricksSqlHook``), the
helper functions can automatically
+retrieve connection information to build the namespace and extract query IDs
from the hook if the hook's ``query_ids``
+attribute is populated. Some hooks (like ``DatabricksHook``) may not
automatically track query IDs,
+in which case you'll need to provide them explicitly.
-Troubleshooting
-=====================
-When testing code locally, `Marquez
<https://marquezproject.ai/docs/quickstart>`_ can be used to inspect the data
being emitted—or not being emitted.
-Using Marquez will allow you to figure out if the error is being caused by the
Extractor or the API.
-If data is being emitted from the Extractor as expected but isn't making it to
the UI,
-then the Extractor is fine and an issue should be opened up in OpenLineage.
However, if data is not being emitted properly,
-it is likely that more unit tests are needed to cover Extractor behavior.
-Marquez can help you pinpoint which facets are not being formed properly so
you know where to add test coverage.
+.. code-block:: python
-Debug settings
-^^^^^^^^^^^^^^
-For debugging purposes, ensure that both `Airflow logging level
<https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#logging-level>`_
-and `OpenLineage client logging level
<https://openlineage.io/docs/client/python#environment-variables>`_ is set to
``DEBUG``.
-The latest provider auto-syncs Airflow's logging level with the OpenLineage
client, removing the need for manual configuration.
+ from airflow import task
+ from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+ from airflow.providers.snowflake.utils.openlineage import
emit_openlineage_events_for_snowflake_queries
+ from airflow.utils.context import get_current_context
-For DebugFacet, containing additional information (e.g., list of all packages
installed), to be appended to all OL events
-enable :ref:`debug_mode <options:debug_mode>` for OpenLineage integration.
-Keep in mind that enabling these settings will increase the detail in Airflow
logs (which will increase their size) and
-add extra information to OpenLineage events. It's recommended to use them
temporarily, primarily for debugging purposes.
+ @task
+ def execute_queries():
+ context = get_current_context()
+ task_instance = context["ti"]
+ hook = SnowflakeHook(snowflake_conn_id="snowflake")
-When seeking help with debugging, always try to provide the following:
+ # Execute queries - hook.run() automatically tracks query_ids
+ hook.run("SELECT * FROM table1; INSERT INTO table2 SELECT * FROM
table1;")
-- Airflow scheduler logs with the logging level set to DEBUG
-- Airflow worker logs (task logs) with the logging level set to DEBUG
-- OpenLineage events with debug_mode enabled
-- Information about Airflow version and OpenLineage provider version
-- Information about any custom modifications made to the deployment
environment where the Airflow is running
+ # Emit OpenLineage events for all executed queries
+ emit_openlineage_events_for_snowflake_queries(
+ task_instance=task_instance,
+ hook=hook,
+ query_for_extra_metadata=True, # Fetch query text, execution
times, etc.
+ )
-Where can I learn more?
-=======================
+When executing queries using raw SDKs (e.g., ``snowflake-connector-python``,
``databricks-sql-connector``) or other methods
+that don't use Airflow hooks, you need to manually track query IDs and provide
them to the helper function along with
+the namespace. In this case, the function cannot retrieve additional metadata
from the database.
-- Check out `OpenLineage website <https://openlineage.io>`_.
-- Visit our `GitHub repository <https://github.com/OpenLineage/OpenLineage>`_.
-- Watch multiple `talks <https://openlineage.io/resources#conference-talks>`_
about OpenLineage.
+.. code-block:: python
-Feedback
-========
+ from databricks import sql
+ from airflow import task
+ from airflow.providers.databricks.utils.openlineage import
emit_openlineage_events_for_databricks_queries
+ from airflow.utils.context import get_current_context
-You can reach out to us on `slack <http://bit.ly/OpenLineageSlack>`_ and leave
us feedback!
+ @task
+ def execute_queries():
+ context = get_current_context()
+ task_instance = context["ti"]
+ query_ids = []
-How to contribute
-=================
+ # Connect using raw Databricks SQL connector
+ connection = sql.connect(
+ server_hostname="workspace.cloud.databricks.com",
+ http_path="/sql/1.0/warehouses/warehouse_id",
+ access_token="token",
+ )
+ cursor = connection.cursor()
+
+ try:
+ # Execute queries and capture query IDs
+ result = cursor.execute("SELECT * FROM table1")
+ query_ids.append(result.command_id) # Get query ID from result
+
+ result = cursor.execute("INSERT INTO table2 SELECT * FROM table1")
+ query_ids.append(result.command_id)
+
+ connection.commit()
+ finally:
+ cursor.close()
+ connection.close()
+
+ # Emit OpenLineage events - must provide query_ids and namespace
explicitly
+ emit_openlineage_events_for_databricks_queries(
+ task_instance=task_instance,
+ query_ids=query_ids,
+
query_source_namespace="databricks://workspace.cloud.databricks.com",
+ query_for_extra_metadata=False, # Cannot fetch metadata without
hook
+ )
-We welcome your contributions! OpenLineage is an Open Source project under
active development, and we'd love your help!
+Troubleshooting
+===============
-Sounds fun? Check out our `new contributor guide
<https://github.com/OpenLineage/OpenLineage/blob/main/CONTRIBUTING.md>`_ to get
started.
+See :ref:`troubleshooting:openlineage`.
diff --git a/providers/openlineage/docs/guides/structure.rst
b/providers/openlineage/docs/guides/structure.rst
index 91372d9d793..037d5b5d294 100644
--- a/providers/openlineage/docs/guides/structure.rst
+++ b/providers/openlineage/docs/guides/structure.rst
@@ -24,15 +24,64 @@ OpenLineage is an open framework for data lineage
collection and analysis.
At its core it is an extensible specification that systems can use to
interoperate with lineage metadata.
`Check out OpenLineage docs <https://openlineage.io/docs/>`_.
+**No change to user Dag files is required to use OpenLineage**, only basic
configuration is needed so that OpenLineage knows where to send events.
+
+.. important::
+
+ All possible OpenLineage configuration options, with example values, can
be found in :ref:`the configuration section <configuration:openlineage>`.
+
Quickstart
==========
-To instrument your Airflow instance with OpenLineage, see
:ref:`guides/user:openlineage`.
+.. note::
-To implement OpenLineage support for Airflow Operators, see
:ref:`guides/developer:openlineage`.
+ OpenLineage offers a diverse range of data transport options (http, kafka,
file etc.),
+ including the flexibility to create a custom solution. Configuration can
be managed through several approaches
+ and there is an extensive array of settings available for users to
fine-tune and enhance their use of OpenLineage.
+ For a comprehensive explanation of these features, please refer to the
subsequent sections of this documentation.
+
+This example is a basic demonstration of OpenLineage user setup.
+For development OpenLineage backend that will receive events, you can use
`Marquez <https://marquezproject.ai/>`_
+
+1. Install provider package or add it to ``requirements.txt`` file.
+
+ .. code-block:: ini
+
+ pip install apache-airflow-providers-openlineage
+
+2. Provide a ``Transport`` configuration so that OpenLineage knows where to
send the events. Within ``airflow.cfg`` file
+
+ .. code-block:: ini
+
+ [openlineage]
+ transport = {"type": "http", "url": "http://example.com:5000",
"endpoint": "api/v1/lineage"}
+
+ or with ``AIRFLOW__OPENLINEAGE__TRANSPORT`` environment variable
+
+ .. code-block:: ini
+
+ AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url":
"http://example.com:5000", "endpoint": "api/v1/lineage"}'
+
+3. **That's it !** When Dags are run, the integration will automatically:
+
+ - Collect task input / output metadata (source, schema, etc.).
+ - Collect task run-level metadata (execution time, state, parameters, etc.)
+ - Collect task job-level metadata (owners, type, description, etc.)
+ - Collect task-specific metadata (bigquery job id, python source code, etc.)
- depending on the Operator
+
+ All this data will be sent as `OpenLineage events
<https://openlineage.io/docs/spec/object-model#event-payload-structure>`_ to
the configured backend.
+
+Next steps
+===========
+See :ref:`the configuration page <configuration:openlineage>` for more details
on how to fine-tune OpenLineage to your needs.
+
+See :ref:`guides/developer:openlineage` for details on how to add OpenLineage
functionality to your Operator.
+
+See :ref:`howto/macros:openlineage` for available macros and details on how
OpenLineage defines job hierarchy.
-What's in it for me ?
-=====================
+
+Benefits of Data Lineage
+=========================
The metadata collected can answer questions like:
@@ -51,8 +100,6 @@ Understanding complex inter-Dag dependencies and providing
up-to-date runtime vi
OpenLineage integrates with Airflow to collect Dag lineage metadata so that
inter-Dag dependencies are easily maintained
and viewable via a lineage graph, while also keeping a catalog of historical
runs of Dags.
-For OpenLineage backend that will receive events, you can use `Marquez
<https://marquezproject.ai/>`_
-
How it works under the hood ?
=============================
@@ -65,3 +112,49 @@ For Dags, the listener runs in Airflow Scheduler. For
TaskInstances, the listene
When TaskInstance listener method gets called, the ``OpenLineageListener``
constructs metadata like event's unique ``run_id`` and event time.
Then, it tries to extract metadata from Airflow Operators as described in
:ref:`extraction_precedence:openlineage`.
+
+.. _client_v_provider:openlineage:
+
+OpenLineage provider vs client
+==============================
+
+The OpenLineage integration consists of two separate packages that work
together:
+
+- **``apache-airflow-providers-openlineage``** (OpenLineage Airflow provider,
this package) - Serves as the
+ Airflow integration layer for OpenLineage. It extracts metadata from Airflow
tasks and DAGs, implements
+ the Airflow listener hooks, provides extractors for various operators, and
passes the extracted metadata
+ to the OpenLineage client for transmission. Keep the provider at the latest
available version supported
+ by your Airflow version to ensure accurate and complete lineage capture.
+
+- **``openlineage-python``** (OpenLineage client) - Responsible for sending
lineage metadata
+ from Airflow to the OpenLineage backend. It handles transport configuration,
event serialization, and
+ communication with the backend. The client can be safely upgraded
independently of Airflow and the provider
+ versions to take advantage of the latest fixes, performance improvements,
and features.
+
+The provider extracts Airflow-specific metadata and formats it into
OpenLineage events, while the client
+handles the actual transmission of those events to your OpenLineage backend.
+
+Troubleshooting
+=====================
+
+See :ref:`troubleshooting:openlineage`.
+
+Where can I learn more?
+=======================
+
+- Check out `OpenLineage website <https://openlineage.io>`_.
+- Visit our `GitHub repository <https://github.com/OpenLineage/OpenLineage>`_.
+- Watch multiple `talks <https://openlineage.io/resources#conference-talks>`_
about OpenLineage.
+
+Feedback
+========
+
+You can reach out to us on `slack <http://bit.ly/OpenLineageSlack>`_ and leave
us feedback!
+
+
+How to contribute
+=================
+
+We welcome your contributions! OpenLineage is an Open Source project under
active development, and we'd love your help!
+
+Sounds fun? Check out our `new contributor guide
<https://github.com/OpenLineage/OpenLineage/blob/main/CONTRIBUTING.md>`_ to get
started.
diff --git a/providers/openlineage/docs/guides/user.rst
b/providers/openlineage/docs/guides/user.rst
deleted file mode 100644
index 50961dc08a5..00000000000
--- a/providers/openlineage/docs/guides/user.rst
+++ /dev/null
@@ -1,561 +0,0 @@
-
- .. Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- .. http://www.apache.org/licenses/LICENSE-2.0
-
- .. Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
-.. _guides/user:openlineage:
-
-Using OpenLineage integration
------------------------------
-
-OpenLineage is an open framework for data lineage collection and analysis. At
its core is an extensible specification that systems can use to interoperate
with lineage metadata.
-`Check out OpenLineage docs <https://openlineage.io/docs/>`_.
-
-**No change to user Dag files is required to use OpenLineage**. Basic
configuration is needed so that OpenLineage knows where to send events.
-
-Quickstart
-==========
-
-.. note::
-
- OpenLineage Provider offers a diverse range of data transport options
(http, kafka, file etc.),
- including the flexibility to create a custom solution. Configuration can
be managed through several approaches
- and there is an extensive array of settings available for users to
fine-tune and enhance their use of OpenLineage.
- For a comprehensive explanation of these features, please refer to the
subsequent sections of this document.
-
-This example is a basic demonstration of OpenLineage setup.
-
-1. Install provider package or add it to ``requirements.txt`` file.
-
- .. code-block:: ini
-
- pip install apache-airflow-providers-openlineage
-
-2. Provide a ``Transport`` configuration so that OpenLineage knows where to
send the events. Within ``airflow.cfg`` file
-
- .. code-block:: ini
-
- [openlineage]
- transport = {"type": "http", "url": "http://example.com:5000",
"endpoint": "api/v1/lineage"}
-
- or with ``AIRFLOW__OPENLINEAGE__TRANSPORT`` environment variable
-
- .. code-block:: ini
-
- AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url":
"http://example.com:5000", "endpoint": "api/v1/lineage"}'
-
-3. **That's it !** OpenLineage events should be sent to the configured
backend when Dags are run.
-
-Usage
-=====
-
-When enabled and configured, the integration requires no further action from
the user. It will automatically:
-
-- Collect task input / output metadata (source, schema, etc.).
-- Collect task run-level metadata (execution time, state, parameters, etc.)
-- Collect task job-level metadata (owners, type, description, etc.)
-- Collect task-specific metadata (bigquery job id, python source code, etc.) -
depending on the Operator
-
-All this data will be sent as OpenLineage events to the configured backend as
described in :ref:`job_hierarchy:openlineage`.
-
-Transport setup
-===============
-
-Primary, and recommended method of configuring OpenLineage Airflow Provider is
Airflow configuration (``airflow.cfg`` file).
-All possible configuration options, with example values, can be found in
:ref:`the configuration section <configuration:openlineage>`.
-
-At minimum, one thing that needs to be set up in every case is ``Transport`` -
where do you wish for
-your events to end up - for example `Marquez <https://marquezproject.ai/>`_.
-
-Transport as JSON string
-^^^^^^^^^^^^^^^^^^^^^^^^
-The ``transport`` option in Airflow configuration is used for that purpose.
-
-.. code-block:: ini
-
- [openlineage]
- transport = {"type": "http", "url": "http://example.com:5000", "endpoint":
"api/v1/lineage"}
-
-``AIRFLOW__OPENLINEAGE__TRANSPORT`` environment variable is an equivalent.
-
-.. code-block:: ini
-
- AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url":
"http://example.com:5000", "endpoint": "api/v1/lineage"}'
-
-
-If you want to look at OpenLineage events without sending them anywhere, you
can set up ``ConsoleTransport`` - the events will end up in task logs.
-
-.. code-block:: ini
-
- [openlineage]
- transport = {"type": "console"}
-
-.. note::
- For full list of built-in transport types, specific transport's options or
instructions on how to implement your custom transport, refer to
- `Python client documentation
<https://openlineage.io/docs/client/python#built-in-transport-types>`_.
-
-Transport as config file
-^^^^^^^^^^^^^^^^^^^^^^^^
-You can also configure OpenLineage ``Transport`` using a YAML file (f.e.
``openlineage.yml``).
-Provide the path to the YAML file as ``config_path`` option in Airflow
configuration.
-
-.. code-block:: ini
-
- [openlineage]
- config_path = '/path/to/openlineage.yml'
-
-``AIRFLOW__OPENLINEAGE__CONFIG_PATH`` environment variable is an equivalent.
-
-.. code-block:: ini
-
- AIRFLOW__OPENLINEAGE__CONFIG_PATH='/path/to/openlineage.yml'
-
-Example content of config YAML file:
-
-.. code-block:: ini
-
- transport:
- type: http
- url: https://backend:5000
- endpoint: events/receive
- auth:
- type: api_key
- apiKey: f048521b-dfe8-47cd-9c65-0cb07d57591e
-
-.. note::
-
- Detailed description of that configuration method, together with example
config files,
- can be found `in Python client documentation
<https://openlineage.io/docs/client/python#built-in-transport-types>`_.
-
-Configuration precedence
-^^^^^^^^^^^^^^^^^^^^^^^^
-
-As there are multiple possible ways of configuring OpenLineage, it's important
to keep in mind the precedence of different configurations.
-OpenLineage Airflow Provider looks for the configuration in the following
order:
-
-1. Check ``config_path`` in ``airflow.cfg`` under ``openlineage`` section (or
AIRFLOW__OPENLINEAGE__CONFIG_PATH environment variable)
-2. Check ``transport`` in ``airflow.cfg`` under ``openlineage`` section (or
AIRFLOW__OPENLINEAGE__TRANSPORT environment variable)
-3. If all the above options are missing, the OpenLineage Python client used
underneath looks for configuration in the order described in `this
<https://openlineage.io/docs/client/python#configuration>`_ documentation.
Please note that **using Airflow configuration is encouraged** and is the only
future proof solution.
-
-Backwards compatibility
-^^^^^^^^^^^^^^^^^^^^^^^
-
-.. warning::
-
- Below variables **should not** be used and can be removed in the future.
Consider using Airflow configuration (described above) for a future proof
solution.
-
-For backwards compatibility with ``openlineage-airflow`` package, some
environment variables are still available:
-
-- ``OPENLINEAGE_DISABLED`` is an equivalent of
``AIRFLOW__OPENLINEAGE__DISABLED``.
-- ``OPENLINEAGE_CONFIG`` is an equivalent of
``AIRFLOW__OPENLINEAGE__CONFIG_PATH``.
-- ``OPENLINEAGE_NAMESPACE`` is an equivalent of
``AIRFLOW__OPENLINEAGE__NAMESPACE``.
-- ``OPENLINEAGE_EXTRACTORS`` is an equivalent of setting
``AIRFLOW__OPENLINEAGE__EXTRACTORS``.
-- ``OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE`` is an equivalent of
``AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE``.
-- ``OPENLINEAGE_URL`` can be used to set up simple http transport. This method
has some limitations and may require using other environment variables to
achieve desired output. See `docs
<https://openlineage.io/docs/client/python#http-transport-configuration-with-environment-variables>`_.
-
-
-Additional Options
-==================
-
-Namespace
-^^^^^^^^^
-
-It's very useful to set up OpenLineage namespace for this particular instance.
-That way, if you use multiple OpenLineage producers, events coming from them
will be logically separated.
-If not set, it's using ``default`` namespace. Provide the name of the
namespace as ``namespace`` option in Airflow configuration.
-
-.. code-block:: ini
-
- [openlineage]
- transport = {"type": "http", "url": "http://example.com:5000", "endpoint":
"api/v1/lineage"}
- namespace = 'my-team-airflow-instance'
-
-``AIRFLOW__OPENLINEAGE__NAMESPACE`` environment variable is an equivalent.
-
-.. code-block:: ini
-
- AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'
-
-Timeout
-^^^^^^^
-
-To add a layer of isolation between task execution and OpenLineage, adding a
level of assurance that OpenLineage execution does not
-interfere with task execution in a way other than taking time, OpenLineage
methods run in separate process.
-The code runs with default timeout of 10 seconds. You can increase this by
setting the ``execution_timeout`` value.
-
-.. code-block:: ini
-
- [openlineage]
- transport = {"type": "http", "url": "http://example.com:5000", "endpoint":
"api/v1/lineage"}
- execution_timeout = 60
-
-``AIRFLOW__OPENLINEAGE__EXECUTION_TIMEOUT`` environment variable is an
equivalent.
-
-.. code-block:: ini
-
- AIRFLOW__OPENLINEAGE__EXECUTION_TIMEOUT=60
-
-.. _options:disable:
-
-Disable
-^^^^^^^
-You can disable sending OpenLineage events without uninstalling OpenLineage
provider by setting
-``disabled`` option to ``true`` in Airflow configuration.
-
-.. code-block:: ini
-
- [openlineage]
- transport = {"type": "http", "url": "http://example.com:5000", "endpoint":
"api/v1/lineage"}
- disabled = true
-
-``AIRFLOW__OPENLINEAGE__DISABLED`` environment variable is an equivalent.
-
-.. code-block:: ini
-
- AIRFLOW__OPENLINEAGE__DISABLED=true
-
-
-Disable source code
-^^^^^^^^^^^^^^^^^^^
-
-Several Operators (f.e. Python, Bash) will by default include their source
code in their OpenLineage events.
-To prevent that, set ``disable_source_code`` option to ``true`` in Airflow
configuration.
-
-.. code-block:: ini
-
- [openlineage]
- transport = {"type": "http", "url": "http://example.com:5000", "endpoint":
"api/v1/lineage"}
- disable_source_code = true
-
-``AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE`` environment variable is an
equivalent.
-
-.. code-block:: ini
-
- AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE=true
-
-
-Disabled for Operators
-^^^^^^^^^^^^^^^^^^^^^^
-
-You can easily exclude some Operators from emitting OpenLineage events by
passing a string of semicolon separated
-full import paths of Airflow Operators to disable as
``disabled_for_operators`` field in Airflow configuration.
-
-.. code-block:: ini
-
- [openlineage]
- transport = {"type": "http", "url": "http://example.com:5000", "endpoint":
"api/v1/lineage"}
- disabled_for_operators =
'airflow.providers.standard.operators.bash.BashOperator;airflow.providers.standard.operators.python.PythonOperator'
-
-``AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS`` environment variable is an
equivalent.
-
-.. code-block:: ini
-
-
AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS='airflow.providers.standard.operators.bash.BashOperator;airflow.providers.standard.operators.python.PythonOperator'
-
-Full Task Info
-^^^^^^^^^^^^^^
-
-By default, OpenLineage integration's AirflowRunFacet - attached on START
event for every task instance event - does
-not contain full serialized task information (parameters to given operator),
but only includes select parameters.
-
-However, we allow users to set OpenLineage integration to include full task
information. By doing this, rather than
-serializing only a few known attributes, we exclude certain non-serializable
elements and send everything else.
-
-.. code-block:: ini
-
- [openlineage]
- transport = {"type": "http", "url": "http://example.com:5000", "endpoint":
"api/v1/lineage"}
- include_full_task_info = true
-
-``AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO`` environment variable is an
equivalent.
-
-.. code-block:: ini
-
- AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO=true
-
-.. warning::
-
- By setting this variable to true, OpenLineage integration does not control
the size of event you sent. It can potentially include elements that are
megabytes in size or larger, depending on the size of data you pass to the task.
-
-
-Custom Extractors
-^^^^^^^^^^^^^^^^^
-
-To use :ref:`custom Extractors <custom_extractors:openlineage>` feature,
register the extractors by passing
-a string of semicolon separated Airflow Operators full import paths to
``extractors`` option in Airflow configuration.
-
-.. code-block:: ini
-
- [openlineage]
- transport = {"type": "http", "url": "http://example.com:5000", "endpoint":
"api/v1/lineage"}
- extractors = full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
-
-``AIRFLOW__OPENLINEAGE__EXTRACTORS`` environment variable is an equivalent.
-
-.. code-block:: ini
-
-
AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'
-
-Custom Run Facets
-^^^^^^^^^^^^^^^^^
-
-To inject :ref:`custom run facets <custom_facets:openlineage>`, register the
custom run facet functions by passing
-a string of semicolon separated full import paths to ``custom_run_facets``
option in Airflow configuration.
-
-.. code-block:: ini
-
- [openlineage]
- transport = {"type": "http", "url": "http://example.com:5000", "endpoint":
"api/v1/lineage"}
- custom_run_facets =
full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function
-
-``AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS`` environment variable is an
equivalent.
-
-.. code-block:: ini
-
-
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'
-
-.. _options:debug_mode:
-
-Debug Mode
-^^^^^^^^^^
-
-You can enable sending additional information in OpenLineage events that can
be useful for debugging and
-reproducing your environment setup by setting ``debug_mode`` option to
``true`` in Airflow configuration.
-
-.. code-block:: ini
-
- [openlineage]
- transport = {"type": "http", "url": "http://example.com:5000", "endpoint":
"api/v1/lineage"}
- debug_mode = true
-
-``AIRFLOW__OPENLINEAGE__DEBUG_MODE`` environment variable is an equivalent.
-
-.. code-block:: ini
-
- AIRFLOW__OPENLINEAGE__DEBUG_MODE=true
-
-.. warning::
-
- By setting this variable to true, OpenLineage integration may log and emit
extensive details. It should only be enabled temporary for debugging purposes.
-
-
-Enabling OpenLineage on Dag/task level
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-One can selectively enable OpenLineage for specific Dags and tasks by using
the ``selective_enable`` policy.
-To enable this policy, set the ``selective_enable`` option to True in the
[openlineage] section of your Airflow configuration file:
-
-.. code-block:: ini
-
- [openlineage]
- selective_enable = True
-
-``AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE`` environment variable is an
equivalent.
-
-.. code-block:: ini
-
- AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE=true
-
-
-While ``selective_enable`` enables selective control, the ``disabled``
:ref:`option <options:disable>` still has precedence.
-If you set ``disabled`` to True in the configuration, OpenLineage will be
disabled for all Dags and tasks regardless of the ``selective_enable`` setting.
-
-Once the ``selective_enable`` policy is enabled, you can choose to enable
OpenLineage
-for individual Dags and tasks using the ``enable_lineage`` and
``disable_lineage`` functions.
-
-1. Enabling Lineage on a Dag:
-
-.. code-block:: python
-
- from airflow.providers.openlineage.utils.selective_enable import
disable_lineage, enable_lineage
-
- with enable_lineage(Dag(...)):
- # Tasks within this Dag will have lineage tracking enabled
- MyOperator(...)
-
- AnotherOperator(...)
-
-2. Enabling Lineage on a Task:
-
-While enabling lineage on a Dag implicitly enables it for all tasks within
that Dag, you can still selectively disable it for specific tasks:
-
-.. code-block:: python
-
- from airflow.providers.openlineage.utils.selective_enable import
disable_lineage, enable_lineage
-
- with DAG(...) as dag:
- t1 = MyOperator(...)
- t2 = AnotherOperator(...)
-
- # Enable lineage for the entire Dag
- enable_lineage(dag)
-
- # Disable lineage for task t1
- disable_lineage(t1)
-
-Enabling lineage on the Dag level automatically enables it for all tasks
within that Dag unless explicitly disabled per task.
-
-Enabling lineage on the task level implicitly enables lineage on its Dag.
-This is because each emitting task sends a `ParentRunFacet
<https://openlineage.io/docs/spec/facets/run-facets/parent_run>`_,
-which requires the Dag-level lineage to be enabled in some OpenLineage backend
systems.
-Disabling Dag-level lineage while enabling task-level lineage might cause
errors or inconsistencies.
-
-.. _options:spark_inject_parent_job_info:
-
-Passing parent job information to Spark jobs
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-OpenLineage integration can automatically inject Airflow's information
(namespace, job name, run id)
-into Spark application properties as parent job information
-(``spark.openlineage.parentJobNamespace``,
``spark.openlineage.parentJobName``, ``spark.openlineage.parentRunId``),
-for :ref:`supported Operators <supported_classes:openlineage>`.
-It allows Spark integration to automatically include ``parentRunFacet`` in
application-level OpenLineage event,
-creating a parent-child relationship between tasks from different integrations.
-See `Scheduling from Airflow
<https://openlineage.io/docs/integrations/spark/configuration/airflow>`_.
-
-This configuration serves as the default behavior for all Operators that
support automatic Spark properties injection,
-unless it is explicitly overridden at the Operator level.
-To prevent a specific Operator from injecting the parent job information while
-allowing all other supported Operators to do so by default,
``openlineage_inject_parent_job_info=False``
-can be explicitly provided to that specific Operator.
-
-.. note::
-
- If any of the ``spark.openlineage.parent*`` properties are manually
specified in the Spark job configuration, the integration will refrain from
injecting parent job properties to ensure that manually provided values are
preserved.
-
-You can enable this automation by setting ``spark_inject_parent_job_info``
option to ``true`` in Airflow configuration.
-
-.. code-block:: ini
-
- [openlineage]
- transport = {"type": "http", "url": "http://example.com:5000", "endpoint":
"api/v1/lineage"}
- spark_inject_parent_job_info = true
-
-``AIRFLOW__OPENLINEAGE__SPARK_INJECT_PARENT_JOB_INFO`` environment variable is
an equivalent.
-
-.. code-block:: ini
-
- AIRFLOW__OPENLINEAGE__SPARK_INJECT_PARENT_JOB_INFO=true
-
-
-Passing transport information to Spark jobs
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-OpenLineage integration can automatically inject Airflow's transport
information into Spark application properties,
-for :ref:`supported Operators <supported_classes:openlineage>`.
-It allows Spark integration to send events to the same backend as Airflow
integration without manual configuration.
-See `Scheduling from Airflow
<https://openlineage.io/docs/integrations/spark/configuration/airflow>`_.
-
-.. note::
-
- If any of the ``spark.openlineage.transport*`` properties are manually
specified in the Spark job configuration, the integration will refrain from
injecting transport properties to ensure that manually provided values are
preserved.
-
-You can enable this automation by setting ``spark_inject_transport_info``
option to ``true`` in Airflow configuration.
-
-.. code-block:: ini
-
- [openlineage]
- transport = {"type": "http", "url": "http://example.com:5000", "endpoint":
"api/v1/lineage"}
- spark_inject_transport_info = true
-
-``AIRFLOW__OPENLINEAGE__SPARK_INJECT_TRANSPORT_INFO`` environment variable is
an equivalent.
-
-.. code-block:: ini
-
- AIRFLOW__OPENLINEAGE__SPARK_INJECT_TRANSPORT_INFO=true
-
-
-Passing parent information to Airflow DAG
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-To enable full OpenLineage lineage tracking across dependent DAGs, you can
pass parent and root job information
-through the DAG's ``dag_run.conf``. When a DAG run configuration includes an
``openlineage`` section with valid metadata,
-this information is automatically parsed and converted into DAG run's
``parentRunFacet``, from which the root information
-is also propagated to all tasks. If no DAG run ``openlineage`` configuration
is provided, the DAG run will not contain
-``parentRunFacet`` and root of all tasks will default to Dag run.
-
-The ``openlineage`` dict in conf should contain the following keys:
-
-
-*(all three values must be included to create a parent reference)*
-
-- **parentRunId** — the unique run ID (uuid) of the direct parent job
-- **parentJobName** — the name of the parent job
-- **parentJobNamespace** — the namespace of the parent job
-
-*(all three values must be included to create a root reference, otherwise
parent will be used as root)*
-
-- **rootParentRunId** — the run ID (uuid) of the top-level (root) job
-- **rootParentJobName** — the name of the top-level (root) job
-- **rootParentJobNamespace** — the namespace of the top-level (root) job
-
-.. note::
-
- We highly recommend providing all six OpenLineage identifiers (parent and
root) to ensure complete lineage tracking. If the root information is missing,
the parent set will be used as the root; if any of the three parent fields are
missing, no parent facet will be created. Partial or mixed configurations are
not supported - either all three parent or all three root values must be
provided together.
-
-
-Example:
-
-.. code-block:: shell
-
- curl -X POST "http://<AIRFLOW_HOST>/api/v2/dags/my_dag_name/dagRuns" \
- -H "Content-Type: application/json" \
- -d '{
- "logical_date": "2019-08-24T14:15:22Z",
- "conf": {
- "openlineage": {
- "parentRunId": "3bb703d1-09c1-4a42-8da5-35a0b3216072",
- "parentJobNamespace": "prod_biz",
- "parentJobName": "get_files",
- "rootParentRunId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e",
- "rootParentJobNamespace": "prod_analytics",
- "rootParentJobName": "generate_report_sales_e2e"
- }
- }
- }'
-
-
-Troubleshooting
-===============
-
-See :ref:`troubleshooting:openlineage` for details on how to troubleshoot
OpenLineage.
-
-
-Adding support for custom Operators
-===================================
-
-If you want to add OpenLineage coverage for particular Operator, take a look
at :ref:`guides/developer:openlineage`
-
-
-Where can I learn more?
-=======================
-
-- Check out `OpenLineage website <https://openlineage.io>`_.
-- Visit our `GitHub repository <https://github.com/OpenLineage/OpenLineage>`_.
-- Watch multiple `talks <https://openlineage.io/resources#conference-talks>`_
about OpenLineage.
-
-Feedback
-========
-
-You can reach out to us on `slack <http://bit.ly/OpenLineageSlack>`_ and leave
us feedback!
-
-
-How to contribute
-=================
-
-We welcome your contributions! OpenLineage is an Open Source project under
active development, and we'd love your help!
-
-Sounds fun? Check out our `new contributor guide
<https://github.com/OpenLineage/OpenLineage/blob/main/CONTRIBUTING.md>`_ to get
started.
diff --git a/providers/openlineage/docs/index.rst
b/providers/openlineage/docs/index.rst
index 856262f87b0..a15923c4bec 100644
--- a/providers/openlineage/docs/index.rst
+++ b/providers/openlineage/docs/index.rst
@@ -35,10 +35,11 @@
:caption: Guides
Intro <guides/structure>
- User <guides/user>
- Developer <guides/developer>
Supported classes <supported_classes>
- Macros <macros>
+ Custom Operators <guides/developer>
+ Job Hierarchy & Macros <macros>
+ Spark Integration <spark>
+ Troubleshooting <troubleshooting>
.. toctree::
:hidden:
@@ -78,7 +79,8 @@
apache-airflow-providers-openlineage package
------------------------------------------------------
-`OpenLineage <https://openlineage.io/>`__
+`OpenLineage <https://openlineage.io/docs/>`__ is an open framework for data
lineage collection.
+At its core it is an extensible specification that systems can use to
interoperate with lineage metadata.
Release: 2.10.0
diff --git a/providers/openlineage/docs/macros.rst
b/providers/openlineage/docs/macros.rst
index c07d20824ec..ce588f96c63 100644
--- a/providers/openlineage/docs/macros.rst
+++ b/providers/openlineage/docs/macros.rst
@@ -17,23 +17,317 @@
.. _howto/macros:openlineage:
-OpenLineage Macros
-==================
+OpenLineage Job Hierarchy & Macros
+===================================
Macros included in OpenLineage plugin get integrated to Airflow's main
collections and become available for use.
+Job Hierarchy in OpenLineage
+-----------------------------
+
+When you need to establish relationships between different jobs (e.g., between
DAGs, or between Airflow tasks and external systems),
+you may need to explicitly pass parent job information. The following sections
describe different scenarios and whether user action is required.
+
+DAG to Task Hierarchy
+^^^^^^^^^^^^^^^^^^^^^^
+
+Apache Airflow features an inherent job hierarchy: DAGs, large and
independently schedulable units, comprise smaller,
+executable tasks. OpenLineage reflects this structure in its Job Hierarchy
model. Within a single DAG, OpenLineage
+automatically tracks the hierarchy between the DAG and its tasks -
TaskInstance events automatically include
+a ``ParentRunFacet`` that references the originating DAG run as parent job.
+
+**User Action Required:** None. OpenLineage automatically establishes the
parent-child relationship between DAG runs and their task instances.
+
+
+TriggerDagRunOperator
+^^^^^^^^^^^^^^^^^^^^^^
+
+The
:class:`~airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunOperator`
triggers a DAG run for a specified Dag ID.
+
+**OpenLineage Behavior:**
+
+Since ``apache-airflow-providers-standard==1.10.0``, by default, the operator
automatically injects OpenLineage parent
+job information into the triggered DAG run's configuration. This creates a
parent-child relationship between the
+triggering task and the triggered DAG run - the triggered DAG Run events will
have a ``ParentRunFacet`` referencing the
+triggering task.
+
+Apart from the above, OpenLineage COMPLETE event for the triggering task
include the following operator-specific
+attributes in the ``AirflowRunFacet``:
+
+- ``trigger_dag_id`` - The Dag ID of the DAG being triggered
+- ``trigger_run_id`` - The Dag Run ID of the DagRun being triggered
+
+**User Action Required:** None. The operator automatically handles parent
information injection.
+
+**To disable automatic injection** pass
``openlineage_inject_parent_info=False``:
+
+.. code-block:: python
+
+ TriggerDagRunOperator(
+ task_id="trigger_downstream",
+ trigger_dag_id="downstream_dag",
+ openlineage_inject_parent_info=False, # Disable automatic injection
+ )
+
+
+Triggering DAGs via API
+^^^^^^^^^^^^^^^^^^^^^^^^
+
+When triggering a DAG run via the Airflow REST API, you can manually pass
parent and root job information through the
+DAG run's ``conf`` parameter. When a DAG run configuration includes an
``openlineage`` section with valid metadata,
+this information is automatically parsed and converted into the
``parentRunFacet`` in DAG run's events, from which the
+root information is also propagated to all tasks within that DAG run.
+
+If no DAG run ``openlineage`` configuration is provided, the DAG run will not
contain a ``parentRunFacet``,
+and the root of all tasks will default to the DAG run itself.
+
+The ``openlineage`` dictionary in the DAG run configuration should contain the
following keys:
+
+**Parent job information** (all three values must be included to create a
parent reference):
+
+- **parentRunId** — the unique run ID (UUID) of the direct parent job
+- **parentJobName** — the name of the parent job
+- **parentJobNamespace** — the namespace of the parent job
+
+**Root job information** (all three values must be included to create a root
reference; otherwise, parent will be used as root):
+
+- **rootParentRunId** — the run ID (UUID) of the top-level (root) job
+- **rootParentJobName** — the name of the top-level (root) job
+- **rootParentJobNamespace** — the namespace of the top-level (root) job
+
+.. note::
+
+ We highly recommend providing all six OpenLineage identifiers (parent and
root) to ensure complete lineage tracking.
+ If the root information is missing, the parent set will be used as the
root. If any of the three parent fields are missing,
+ no parent facet will be created. Partial or mixed configurations are not
supported—either all three parent values or all three
+ root values must be provided together.
+
+Example:
+
+.. code-block:: shell
+
+ curl -X POST "http://<AIRFLOW_HOST>/api/v2/dags/my_dag_name/dagRuns" \
+ -H "Content-Type: application/json" \
+ -d '{
+ "logical_date": "2019-08-24T14:15:22Z",
+ "conf": {
+ "openlineage": {
+ "parentRunId": "3bb703d1-09c1-4a42-8da5-35a0b3216072",
+ "parentJobNamespace": "prod_biz",
+ "parentJobName": "get_files",
+ "rootParentRunId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e",
+ "rootParentJobNamespace": "prod_analytics",
+ "rootParentJobName": "generate_report_sales_e2e"
+ }
+ }
+ }'
+
+**User Action Required:** Yes - you must manually include the parent and root
job information in the DAG run ``conf``.
+
+ExternalTaskSensor
+^^^^^^^^^^^^^^^^^^^
+
+The
:class:`~airflow.providers.standard.sensors.external_task.ExternalTaskSensor`
waits for a task(s) in another DAG.
+
+**OpenLineage Behavior:**
+
+OpenLineage events for the sensor task include the following operator-specific
attributes in the ``AirflowRunFacet``:
+
+- ``external_dag_id`` - The DAG ID of the external task being waited for
+- ``external_task_id`` - The task ID of the external task being waited for
(when waiting for a single task)
+- ``external_task_ids`` - List of task IDs being waited for (when waiting for
multiple tasks)
+- ``external_task_group_id`` - The task group ID being waited for (when
waiting for a task group)
+- ``external_dates_filter`` - The date filter applied when checking for
external task completion
+
+These attributes provide visibility into cross-DAG dependencies but do not
create a parent-child job relationship
+
+**User Action Required:** No automatic parent relationship is created. If you
need to track this relationship in
+OpenLineage, consider using TriggerDagRunOperator, manually passing parent
information via API or using the existing
+attributes to create it manually.
+
+Airflow Assets
+^^^^^^^^^^^^^^^
+
+Airflow Assets allow you to schedule DAGs based on when tasks update assets
(data dependencies). When a task updates
+an asset and another DAG is scheduled based on that asset, OpenLineage tracks
the asset relationship.
+
+**OpenLineage Behavior:**
+
+Tasks that produce assets (using ``outlets=[Asset(...)]``) and DAGs scheduled
based on assets
+(using ``schedule=[Asset(...)]``) are tracked by OpenLineage as consuming
those assets.
+
+When a DAG run is triggered by asset consumption, OpenLineage adds a
``JobDependenciesRunFacet`` to the DAG run's events
+(START and COMPLETE/FAIL). This facet contains upstream job dependencies
showing all consumed asset events and OpenLineage
+job/run information of the asset-producing jobs. Each dependency includes:
+
+- Job identifier (OpenLineage namespace and name) of the producing task
+- Run identifier (OpenLineage run ID) of the producing task instance, if
available
+- Dependency type: ``IMPLICIT_ASSET_DEPENDENCY``
+- Asset events information: details about all asset events consumed from that
job, including asset URI, asset ID,
+ source DAG run ID, and other metadata
+
+Note that a ``ParentRunFacet`` is **not** added to consuming DAG run events.
Instead, the ``JobDependenciesRunFacet``
+provides a more flexible representation that can handle multiple upstream
dependencies (when a DAG consumes assets from
+multiple producing tasks) and preserves detailed information about each asset
event.
+
+The asset relationship creates a data lineage connection in OpenLineage,
showing which tasks produce and consume assets.
+
+.. code-block:: json
+
+ "run": {
+ "facets": {
+ "jobDependencies": {
+ "upstream": [
+ {
+ "job": {
+ "name": "dag_asset_1_producer.produce_dataset_1",
+ "namespace": "airflow"
+ },
+ "run": {
+ "runId": "019b6ff1-f2f0-79bf-a797-0bbe6983c753"
+ },
+ "airflow": {
+ "asset_events": [
+ {
+ "asset_id": 1,
+ "asset_uri": "s3://first-bucket/ds1.csv",
+ "dag_run_id": "manual__2025-12-30T15:48:06+00:00",
+ "asset_event_id": 1
+ }
+ ]
+ },
+ "dependency_type": "IMPLICIT_ASSET_DEPENDENCY"
+ },
+ {
+ "job": {
+ "name": "dag_asset_1_producer.produce_dataset_1",
+ "namespace": "airflow"
+ },
+ "run": {
+ "runId": "019b6ff4-4c80-7b5f-9f35-da28a44030df"
+ },
+ "airflow": {
+ "asset_events": [
+ {
+ "asset_id": 1,
+ "asset_uri": "s3://first-bucket/ds1.csv",
+ "dag_run_id": "manual__2025-12-30T15:50:40+00:00",
+ "asset_event_id": 2
+ }
+ ]
+ },
+ "dependency_type": "IMPLICIT_ASSET_DEPENDENCY"
+ },
+ {
+ "job": {
+ "name": "dag_asset_2_producer.produce_dataset_2",
+ "namespace": "airflow"
+ },
+ "run": {
+ "runId": "019b6ff4-7f48-7ee5-aacb-a88072516b1e"
+ },
+ "airflow": {
+ "asset_events": [
+ {
+ "asset_id": 2,
+ "asset_uri": "gs://second-bucket/ds2.xlsx",
+ "dag_run_id": "manual__2025-12-30T15:50:53+00:00",
+ "asset_event_id": 3
+ }
+ ]
+ },
+ "dependency_type": "IMPLICIT_ASSET_DEPENDENCY"
+ }
+ ],
+ "downstream": []
+ }
+ }
+ }
+
+
+**User Action Required:** None. Relationships are automatically tracked for
data lineage.
+
+
+Manually Emitting Asset Events via API
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+When you manually emit asset events via the Airflow REST API (e.g., when
assets are updated outside of Airflow tasks),
+you can include OpenLineage job information in the asset event's ``extra``
field. This allows OpenLineage to
+track the relationship between the asset producer and consumers, even when the
asset event is not directly linked to
+an Airflow TaskInstance.
+
+**OpenLineage Behavior:**
+
+When an asset event is manually created via API without a TaskInstance
reference,
+OpenLineage checks (after TaskInstance and AssetEvent source fields) for
parent job information in
+``asset_event.extra["openlineage"]``. If present, this information is used to
create job dependencies in the
+``JobDependenciesRunFacet`` in events of DAG runs that consume the asset
+
+**Required fields in ``asset_event.extra["openlineage"]``:**
+
+- **parentJobName** (required) - The name of the parent job that produced the
asset
+- **parentJobNamespace** (required) - The namespace of the parent job
+- **parentRunId** (optional) - The run ID (UUID) of the parent job execution.
If provided, must be a valid UUID format
+
+**Example API call:**
+
+.. code-block:: shell
+
+ curl -X POST "http://<AIRFLOW_HOST>/api/v2/assets/events" \
+ -H "Content-Type: application/json" \
+ -d '{
+ "asset_id": 3,
+ "extra": {
+ "openlineage": {
+ "parentJobName": "external_system.data_processor",
+ "parentJobNamespace": "prod_etl",
+ "parentRunId": "3bb703d1-09c1-4a42-8da5-35a0b3216072"
+ }
+ }
+ }'
+
+**User Action Required:** Yes - you must manually include the OpenLineage job
information in the asset event's
+``extra`` field when emitting events via API.
+
+Child Jobs Outside Airflow
+^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+When Airflow tasks trigger external systems (e.g., Spark applications,
external APIs, other schedulers), those child
+jobs need to be explicitly configured with parent job information to establish
the hierarchy in OpenLineage.
+
+**User Action Required:** Yes - you must use macros (see below) or automatic
injection mechanisms (e.g., for Spark)
+to pass parent job information to the child job.
+
+
+Preserving Job Hierarchy with Macros
+--------------------------------------
+
+To establish a correct job hierarchy in lineage tracking, child jobs (e.g.,
Spark applications, external systems, or downstream DAGs)
+need to know about their parent job (the Airflow task that triggered them).
This allows the child job's OpenLineage integration to
+automatically add a ``ParentRunFacet`` to its OpenLineage events, linking the
child job to its originating Airflow job in the lineage graph.
+
+The macros provided by the OpenLineage provider allow you to pass this parent
job information from Airflow tasks to child jobs.
+The ``lineage_*`` macros describe the Airflow task itself, which from the
child job's perspective is the parent.
+The ``lineage_root_*`` macros forward the Airflow task's root information into
the child job, allowing the child job to maintain
+the complete job hierarchy and information about the root of the job hierarchy.
+
They can be invoked as a Jinja template, e.g.
Lineage job & run macros
-------------------------
+^^^^^^^^^^^^^^^^^^^^^^^^^
These macros:
- * ``lineage_job_namespace()``
- * ``lineage_job_name(task_instance)``
- * ``lineage_run_id(task_instance)``
-allow injecting pieces of run information of a given Airflow task into the
arguments sent to a remote processing job.
-For example, ``SparkSubmitOperator`` can be set up like this:
+ * ``lineage_job_namespace()`` - Returns OpenLineage namespace for a given
task_instance
+ * ``lineage_job_name(task_instance)`` - Returns OpenLineage job name for a
given task_instance
+ * ``lineage_run_id(task_instance)`` - Returns the generated OpenLineage run
id for a given task_instance
+
+describe the Airflow task and should be used as **parent** information when
configuring child jobs. From the child job's perspective,
+the Airflow task is the parent.
+
+**Example: Using macros with Spark applications**
+
+When triggering Spark jobs from Airflow, you can pass parent job information
using these macros:
.. code-block:: python
@@ -41,22 +335,75 @@ For example, ``SparkSubmitOperator`` can be set up like
this:
task_id="my_task",
application="/script.py",
conf={
- # separated components
"spark.openlineage.parentJobNamespace": "{{
macros.OpenLineageProviderPlugin.lineage_job_namespace() }}",
"spark.openlineage.parentJobName": "{{
macros.OpenLineageProviderPlugin.lineage_job_name(task_instance) }}",
"spark.openlineage.parentRunId": "{{
macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}",
},
)
-Lineage parent id
------------------
+**Example: Using macros with other child jobs**
+
+These macros work with any child job that accepts parent job information. For
example, you might pass this information
+to external systems, downstream DAGs, or other processing frameworks:
+
+.. code-block:: python
+
+ PythonOperator(
+ task_id="trigger_external_job",
+ python_callable=call_external_api,
+ op_kwargs={
+ "parent_job_namespace": "{{
macros.OpenLineageProviderPlugin.lineage_job_namespace() }}",
+ "parent_job_name": "{{
macros.OpenLineageProviderPlugin.lineage_job_name(task_instance) }}",
+ "parent_run_id": "{{
macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}",
+ },
+ )
+
+
+Lineage root macros
+^^^^^^^^^^^^^^^^^^^^
+
+These macros:
+
+ * ``lineage_root_job_namespace(task_instance)`` - Returns OpenLineage
namespace of root job of a given task_instance
+ * ``lineage_root_job_name(task_instance)`` - Returns OpenLineage job name of
root job of a given task_instance
+ * ``lineage_root_run_id(task_instance)`` - Returns OpenLineage run ID of
root run of a given task_instance
+
+forward the Airflow task's root information into the child job and should be
used as **root** information when configuring child jobs.
+This allows the child job to maintain the complete job hierarchy, especially
in scenarios where tasks are executed as part of a larger workflow.
+
+**Example: Using root macros with Spark applications**
+
+.. code-block:: python
+
+ SparkSubmitOperator(
+ task_id="my_task",
+ application="/script.py",
+ conf={
+ "spark.openlineage.rootJobNamespace": "{{
macros.OpenLineageProviderPlugin.lineage_root_job_namespace(task_instance) }}",
+ "spark.openlineage.rootJobName": "{{
macros.OpenLineageProviderPlugin.lineage_root_job_name(task_instance) }}",
+ "spark.openlineage.rootRunId": "{{
macros.OpenLineageProviderPlugin.lineage_root_run_id(task_instance) }}",
+ },
+ )
+
+Joined identifiers
+^^^^^^^^^^^^^^^^^^^
+
+Instead of passing separate components, you can use combined macros that
return all information in a single string.
+These macros are useful when you need to pass the complete identifier to a
child job in one parameter.
+
+The ``lineage_parent_id(task_instance)`` macro combines the parent information
(namespace, job name, and run id)
+into one string structured as ``{namespace}/{job_name}/{run_id}``. This
represents the Airflow task and should be used
+as parent information when configuring child jobs.
-Same information, but compacted to one string, can be passed using
``linage_parent_id(task_instance)`` macro:
+Similarly, the ``lineage_root_parent_id(task_instance)`` macro combines the
root information (root namespace, root job name,
+and root run id) into one string structured as
``{namespace}/{job_name}/{run_id}``. This forwards the Airflow task's root
+information and should be used as root information when configuring child jobs.
.. code-block:: python
def my_task_function(templates_dict, **kwargs):
parent_job_namespace, parent_job_name, parent_run_id =
templates_dict["parentRun"].split("/")
+ root_job_namespace, root_job_name, root_run_id =
templates_dict["rootRun"].split("/")
...
@@ -64,9 +411,37 @@ Same information, but compacted to one string, can be
passed using ``linage_pare
task_id="render_template",
python_callable=my_task_function,
templates_dict={
- # joined components as one string `<namespace>/<name>/<run_id>`
+ # Parent information as one string `<namespace>/<name>/<run_id>`
"parentRun": "{{
macros.OpenLineageProviderPlugin.lineage_parent_id(task_instance) }}",
+ # Root information as one string `<namespace>/<name>/<run_id>`
+ "rootRun": "{{
macros.OpenLineageProviderPlugin.lineage_root_parent_id(task_instance) }}",
},
provide_context=False,
dag=dag,
)
+
+
+Example
+^^^^^^^^
+
+When you need to pass both parent and root lineage information to a child job,
you can combine all macros
+in a single operator configuration. This example shows how to use both parent
and root macros with a Spark application:
+
+.. code-block:: python
+
+ SparkSubmitOperator(
+ task_id="process_data",
+ application="/path/to/spark/app.py",
+ conf={
+ # Parent lineage information
+ "spark.openlineage.parentJobNamespace": "{{
macros.OpenLineageProviderPlugin.lineage_job_namespace() }}",
+ "spark.openlineage.parentJobName": "{{
macros.OpenLineageProviderPlugin.lineage_job_name(task_instance) }}",
+ "spark.openlineage.parentRunId": "{{
macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}",
+ # Root lineage information
+ "spark.openlineage.rootJobNamespace": "{{
macros.OpenLineageProviderPlugin.lineage_root_job_namespace(task_instance) }}",
+ "spark.openlineage.rootJobName": "{{
macros.OpenLineageProviderPlugin.lineage_root_job_name(task_instance) }}",
+ "spark.openlineage.rootRunId": "{{
macros.OpenLineageProviderPlugin.lineage_root_run_id(task_instance) }}",
+ },
+ )
+
+For more Spark-specific examples and automatic injection options, see
:ref:`spark:openlineage`.
diff --git a/providers/openlineage/docs/spark.rst
b/providers/openlineage/docs/spark.rst
new file mode 100644
index 00000000000..f4301525fee
--- /dev/null
+++ b/providers/openlineage/docs/spark.rst
@@ -0,0 +1,280 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. _spark:openlineage:
+
+OpenLineage Spark Integration
+==============================
+
+The OpenLineage Spark integration is a **separate package** from the Airflow
OpenLineage provider.
+For Spark applications to send lineage (regardless if triggered from Airflow
or not),
+you need to have the Spark OpenLineage integration enabled and configured in
your Spark application.
+The Airflow OpenLineage provider can, in some cases, help facilitate this by
automatically injecting necessary configuration into Spark jobs.
+Usually, the same parameters that are passed to spark-submit can also be
supplied directly from Airflow and other schedulers,
+allowing for seamless configuration and execution of Spark jobs.
+
+Understanding different integrations
+-------------------------------------
+
+**Airflow OpenLineage Provider**
+
+Tracks lineage for Airflow DAGs and tasks. It emits OpenLineage events when
Airflow tasks execute,
+capturing information about the Airflow workflow itself. It must be installed
and enabled in your Airflow environment.
+
+**Spark OpenLineage Integration**
+
+Tracks lineage for Spark applications. It uses the
``OpenLineageSparkListener`` to monitor Spark
+execution and extract metadata about datasets, jobs, and their dependencies.
This integration
+must be enabled in your Spark application independently.
+
+**When Spark jobs are triggered from Airflow, both integrations work together**
+
+- The Airflow OpenLineage provider tracks the Airflow task that triggers the
Spark job
+- The Spark integration tracks the actual Spark application execution
+- Parent job information injected into Spark application by Airflow task links
the two jobs together
+
+For detailed information about the Spark integration, see the
+`OpenLineage Spark documentation
<https://openlineage.io/docs/integrations/spark/>`_.
+
+Enabling Spark OpenLineage Integration
+---------------------------------------
+
+To enable OpenLineage in your Spark application, you need to install and
configure the OpenLineage Spark
+integration. **This is a separate step from enabling the Airflow OpenLineage
provider.**
+For detailed installation instructions, including different installation
methods, see the
+`OpenLineage Spark Installation documentation
<https://openlineage.io/docs/integrations/spark/installation>`_.
+After installation, you'll need to configure the Spark OpenLineage listener
and other settings.
+For complete Spark configuration options, see the
+`Spark Configuration documentation
<https://openlineage.io/docs/integrations/spark/configuration/>`_.
+
+Preserving Job Hierarchy
+-------------------------
+
+To establish a correct job hierarchy in lineage tracking, the Spark
application needs to know about
+its parent job (e.g., the Airflow task that triggered it). This allows the
Spark integration to automatically
+add a ``ParentRunFacet`` to the Spark application-level OpenLineage event,
linking the Spark job to its
+originating Airflow job in the lineage graph.
+
+For a general explanation of why preserving job hierarchy is important and how
it works, see :ref:`howto/macros:openlineage`.
+
+The following Spark properties are required for automatic creation of the
``ParentRunFacet``:
+
+- ``spark.openlineage.parentJobNamespace`` - Namespace of the parent job
(Airflow task)
+- ``spark.openlineage.parentJobName`` - Job name of the parent job (Airflow
task)
+- ``spark.openlineage.parentRunId`` - Run ID of the parent job (Airflow task)
+
+Additionally, the following properties (available in Spark integration version
1.31.0 and later) allow
+easier connection of the root (top-level parent) job to the children jobs:
+
+- ``spark.openlineage.rootParentJobNamespace`` - Namespace of the root job
(e.g., Airflow DAG)
+- ``spark.openlineage.rootParentJobName`` - Job name of the root job (e.g.,
Airflow DAG)
+- ``spark.openlineage.rootParentRunId`` - Run ID of the root job (e.g.,
Airflow DAG)
+
+
+Automatic Injection
+-------------------
+
+The Airflow OpenLineage provider can automatically inject parent job
information and transport configuration
+into Spark application properties when Spark jobs are submitted from Airflow.
This eliminates the need
+to manually configure these properties in every Spark operator.
+
+Automatic injection is supported for the following operators:
+
+- :class:`~airflow.providers.apache.livy.operators.livy.LivyOperator`
+-
:class:`~airflow.providers.apache.spark.operators.spark_submit.SparkSubmitOperator`
+-
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateBatchOperator`
+-
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator`
+-
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`
+
+
+.. _options:spark_inject_parent_job_info:
+
+Enabling Automatic Parent Job Information Injection
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Airflow OpenLineage provider can automatically inject Airflow's task
information into Spark application properties as parent job information.
+
+This configuration serves as the default behavior for all Operators that
support automatic Spark properties injection,
+unless it is explicitly overridden at the Operator level. To prevent a
specific Operator from injecting the parent job information while
+allowing all other supported Operators to do so by default,
``openlineage_inject_parent_job_info=False``
+can be explicitly provided to that specific Operator.
+
+You can enable this automation by setting ``spark_inject_parent_job_info``
option to ``true`` in Airflow configuration:
+
+.. code-block:: ini
+
+ [openlineage]
+ transport = {"type": "http", "url": "http://example.com:5000", "endpoint":
"api/v1/lineage"}
+ spark_inject_parent_job_info = true
+
+``AIRFLOW__OPENLINEAGE__SPARK_INJECT_PARENT_JOB_INFO`` environment variable is
an equivalent:
+
+.. code-block:: bash
+
+ export AIRFLOW__OPENLINEAGE__SPARK_INJECT_PARENT_JOB_INFO=true
+
+When enabled, the following properties are automatically injected into Spark
job configuration:
+
+- ``spark.openlineage.parentJobNamespace``
+- ``spark.openlineage.parentJobName``
+- ``spark.openlineage.parentRunId``
+- ``spark.openlineage.rootParentJobNamespace``
+- ``spark.openlineage.rootParentJobName``
+- ``spark.openlineage.rootParentRunId``
+
+.. note::
+
+ If any of the ``spark.openlineage.parent*`` properties are manually
specified in the Spark job
+ configuration, the integration will refrain from injecting parent job
properties to ensure that manually
+ provided values are preserved.
+
+
+.. _options:spark_inject_transport_info:
+
+Enabling Automatic Transport Information Injection
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Airflow OpenLineage provider can automatically inject Airflow's transport
information into Spark application properties.
+When enabled, the transport configuration from Airflow (URL, authentication,
etc.) is automatically
+injected into Spark job configuration, allowing Spark OpenLineage integration
to send events to the same OpenLineage backend
+as Airflow without manual configuration.
+
+.. caution::
+
+ Currently, only HTTP transport is supported for automatic transport
injection (with api_key authentication, if configured).
+
+
+.. note::
+
+ Ensure that the OpenLineage backend is accessible from the Spark execution
environment.
+ Depending on where your Spark jobs run (e.g., on-premises clusters, cloud
environments, isolated networks),
+ you may need to configure network access, proxies, or firewall rules to
allow Spark applications to reach the same backend as Airflow environment.
+
+This configuration serves as the default behavior for all Operators that
support automatic Spark properties injection,
+unless it is explicitly overridden at the Operator level. To prevent a
specific Operator from injecting the transport information while
+allowing all other supported Operators to do so by default,
``openlineage_inject_transport_info=False``
+can be explicitly provided to that specific Operator.
+
+You can enable this automation by setting ``spark_inject_transport_info``
option to ``true`` in Airflow configuration:
+
+.. code-block:: ini
+
+ [openlineage]
+ transport = {"type": "http", "url": "http://example.com:5000", "endpoint":
"api/v1/lineage"}
+ spark_inject_transport_info = true
+
+``AIRFLOW__OPENLINEAGE__SPARK_INJECT_TRANSPORT_INFO`` environment variable is
an equivalent:
+
+.. code-block:: bash
+
+ export AIRFLOW__OPENLINEAGE__SPARK_INJECT_TRANSPORT_INFO=true
+
+.. note::
+
+ If any of the ``spark.openlineage.transport*`` properties are manually
specified in the Spark job configuration,
+ the integration will refrain from injecting transport properties to ensure
that manually provided values are preserved.
+
+
+Per-Operator Configuration
+^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+You can override the global configuration on a per-operator basis using
operator parameters.
+This allows you to customize the injection behavior for specific operators
while maintaining the default behavior for others:
+
+.. code-block:: python
+
+ SparkSubmitOperator(
+ task_id="my_task",
+ application="/path/to/app.py",
+ openlineage_inject_parent_job_info=True, # Override global setting
+ openlineage_inject_transport_info=False, # Disable for this operator
+ conf={
+ # Your Spark configuration
+ },
+ )
+
+
+
+Complete Example
+-----------------
+
+Here's a complete example using ``DataprocSubmitJobOperator`` with automatic
injection enabled:
+
+.. code-block:: python
+
+ from airflow.providers.google.cloud.operators.dataproc import
DataprocSubmitJobOperator
+
+ spark_job = DataprocSubmitJobOperator(
+ task_id="process_data",
+ project_id="my-project",
+ region="us-central1",
+ job={
+ "reference": {"project_id": "my-project"},
+ "placement": {"cluster_name": "my-cluster"},
+ "pyspark_job": {
+ "main_python_file_uri": "gs://bucket/my-spark-app.py",
+ "properties": {
+ # Spark OpenLineage listener and jar
+ "spark.extraListeners":
"io.openlineage.spark.agent.OpenLineageSparkListener",
+ "spark.jars.packages":
"io.openlineage:openlineage-spark_${SCALA_BINARY_VERSION}:1.41.0",
+ # Transport configuration will be automatically injected
if spark_inject_transport_info is enabled
+ # Parent and root information will be automatically
injected if spark_inject_parent_job_info is enabled
+ },
+ },
+ },
+ dag=dag,
+ )
+
+With automatic injection enabled, the parent job information and transport
configuration are added
+automatically, so you only need to configure the Spark OpenLineage listener
and namespace.
+
+
+Manual Configuration Using Macros
+-----------------------------------
+
+If you need more control over the configuration that Airflow injects into
Spark application, you can use the OpenLineage macros.
+See :ref:`howto/macros:openlineage`.
+
+Example with manual configuration:
+
+.. code-block:: python
+
+ SparkSubmitOperator(
+ task_id="my_task",
+ application="/path/to/spark/app.py",
+ conf={
+ # Spark OpenLineage listener and packages
+ "spark.extraListeners":
"io.openlineage.spark.agent.OpenLineageSparkListener",
+ "spark.jars.packages":
"io.openlineage:openlineage-spark_${SCALA_BINARY_VERSION}:1.41.0",
+ # Spark OpenLineage namespace
+ "spark.openlineage.namespace": "my-spark-namespace",
+ # Transport configuration
+ "spark.openlineage.transport.type": "http",
+ "spark.openlineage.transport.url":
"http://openlineage-backend:5000",
+ "spark.openlineage.transport.endpoint": "api/v1/lineage",
+ "spark.openlineage.transport.auth.type": "api_key",
+ "spark.openlineage.transport.auth.apiKey": "your-api-key",
+ # Parent job information (using macros)
+ "spark.openlineage.parentJobNamespace": "{{
macros.OpenLineageProviderPlugin.lineage_job_namespace() }}",
+ "spark.openlineage.parentJobName": "{{
macros.OpenLineageProviderPlugin.lineage_job_name(task_instance) }}",
+ "spark.openlineage.parentRunId": "{{
macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}",
+ # Root parent job information (using macros)
+ "spark.openlineage.rootParentJobNamespace": "{{
macros.OpenLineageProviderPlugin.lineage_root_job_namespace(task_instance) }}",
+ "spark.openlineage.rootParentJobName": "{{
macros.OpenLineageProviderPlugin.lineage_root_job_name(task_instance) }}",
+ "spark.openlineage.rootParentRunId": "{{
macros.OpenLineageProviderPlugin.lineage_root_run_id(task_instance) }}",
+ },
+ )
diff --git a/providers/openlineage/docs/supported_classes.rst
b/providers/openlineage/docs/supported_classes.rst
index 33c98d0a79a..ba37a2a3c31 100644
--- a/providers/openlineage/docs/supported_classes.rst
+++ b/providers/openlineage/docs/supported_classes.rst
@@ -21,7 +21,7 @@
Supported classes
===================
-Below is a list of Operators and Hooks that support OpenLineage extraction,
along with specific DB types that are compatible with the
SQLExecuteQueryOperator.
+Below is a list of Operators and Hooks that support OpenLineage extraction,
along with specific DB types that are compatible with the supported SQL
operators.
.. important::
@@ -29,6 +29,26 @@ Below is a list of Operators and Hooks that support
OpenLineage extraction, alon
please be aware that our updating process is automated and may not always
capture everything accurately.
Detecting hook level lineage is challenging so make sure to double check
the information provided below.
+What does "supported operator" mean?
+-------------------------------------
+
+**All Airflow operators will automatically emit OpenLineage events**, (unless
explicitly disabled or skipped during
+scheduling, like EmptyOperator) regardless of whether they appear on the
"supported" list.
+Every OpenLineage event will contain basic information such as:
+
+- Task and DAG run metadata (execution time, state, tags, parameters, owners,
description, etc.)
+- Job relationship (DAG job that the task belongs to, upstream/downstream
relationship between tasks in a DAG etc.)
+- Error message (in case of task failure)
+- Airflow and OpenLineage provider versions
+
+**"Supported" operators provide additional metadata** that enhances the
lineage information:
+
+- **Input and output datasets** (sometimes with Column Level Lineage)
+- **Operator-specific details** that may include SQL query text and query IDs,
source code, job IDs from external systems (e.g., Snowflake or BigQuery job
ID), data quality metrics and other information.
+
+For example, a supported SQL operator will include the executed SQL query,
query ID, and input/output table information
+in its OpenLineage events. An unsupported operator will still appear in the
lineage graph, but without these details.
+
.. tip::
You can easily implement OpenLineage support for any operator. See
:ref:`guides/developer:openlineage`.
diff --git a/providers/openlineage/docs/troubleshooting.rst
b/providers/openlineage/docs/troubleshooting.rst
new file mode 100644
index 00000000000..b8e7781fa92
--- /dev/null
+++ b/providers/openlineage/docs/troubleshooting.rst
@@ -0,0 +1,165 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. _troubleshooting:openlineage:
+
+Troubleshooting
+================
+
+.. attention::
+
+ OpenLineage is under active development. Before troubleshooting, please
**upgrade to the latest provider and client
+ version** and verify that the issue still occurs, as it may have already
been resolved in a newer release.
+
+1. Update the provider and its core dependencies
+2. Enable debugging
+3. Inspect events locally
+4. Perform a quick check of your setup
+5. Check for reported bugs in OpenLineage provider and OpenLineage client
+6. Before asking for help anywhere, gather all the information listed below
that will help diagnose the issue
+7. If you are still facing the issue, please open an issue on the provider
repository.
+8. If all else fails, you can try asking for help on the OpenLineage slack
channel (but remember that it is a community channel and not a support channel,
and people are volunteering their time to help you).
+
+
+1. Upgrade the provider and its core dependencies
+--------------------------------------------------
+
+Upgrade the OpenLineage provider and the OpenLineage client. If you'd like to
know the difference between the two, you can read more about it in the
:ref:`client_v_provider:openlineage`.
+
+.. code-block:: bash
+
+ pip install --upgrade apache-airflow-providers-openlineage openlineage-python
+
+Then verify the versions in use are the latest available:
+
+.. code-block:: bash
+
+ pip show apache-airflow-providers-openlineage openlineage-python | cat
+
+
+2. Enable debug settings
+-------------------------
+
+Enable debug logs by setting the logging level to DEBUG for both Airflow and
the OpenLineage client:
+
+- In Airflow, use the `logging_level configuration
<https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#logging-level>`_
and set the logging level to DEBUG. You can do this f.e. by exporting and env
variable ``AIRFLOW__LOGGING__LOGGING_LEVEL=DEBUG``.
+- OpenLineage client should automatically pick up the logging level from
Airflow, but you can also set it explicitly by exporting and env variable
``OPENLINEAGE_LOG_LEVEL=DEBUG``.
+
+Enable :ref:`Debug Mode <config:openlineage__debug_mode>` so that the
DebugFacet (additional diagnostic info) is attached to events. It can
drastically increase the size of the events and logs, so this should only be
used temporarily.
+
+
+3. Inspect events locally
+--------------------------
+
+With debug logs enabled, raw OpenLineage events will be logged before
emitting. Check logs for Openlineage events.
+
+You can also use some simple transport like the ``ConsoleTransport`` to print
events to task logs or ``FileTransport`` to save events to json files, e.g.
+
+ .. code-block:: ini
+
+ [openlineage]
+ transport = {"type": "console"}
+
+- Or run `Marquez <https://marquezproject.ai/docs/quickstart>`_ locally to
inspect whether events are emitted and received.
+
+
+
+4. Perform a quick check of your setup
+--------------------------------------
+
+- Verify the documentation of provider and `client
<https://openlineage.io/docs/client/python>`_, maybe something has changed.
+- Configuration present: Ensure a working transport is configured. See
:ref:`Transport <config:openlineage__transport>`.
+- Disabled settings: Verify you did not disable the integration globally via
:ref:`Disabled <config:openlineage__disabled>` or selectively via
:ref:`Disabled for operators <config:openlineage__disabled_for_operators>` or
:ref:`Selective Enable <config:openlineage__selective_enable>` policy.
+- Extraction precedence: If inputs/outputs are missing, remember the order
described in :ref:`extraction_precedence:openlineage`.
+- Custom extractors registration: If using custom extractors, confirm they are
registered via :ref:`Extractors <config:openlineage__extractors>` and
importable by both Scheduler and Workers.
+- Environment variables: For legacy environments, note the
backwards-compatibility env vars in :ref:`Backwards Compatibility
<configuration_backwards_compatibility:openlineage>` (e.g.,
``OPENLINEAGE_URL``) but prefer Airflow config.
+
+
+5. Check for common symptoms and fixes
+--------------------------------------
+
+No events emitted at all:
+
+ - Ensure the provider is installed and at a supported Airflow version (see
provider "Requirements").
+ - Check :ref:`Disabled <config:openlineage__disabled>` is not set to
``true``.
+ - If using selective enablement, verify :ref:`Selective Enable
<config:openlineage__selective_enable>` and that the DAG/task is enabled via
``enable_lineage``.
+ - Confirm the OpenLineage plugin/listener is loaded in Scheduler/Worker logs.
+
+Events emitted but not received by backend
+
+ - Validate :ref:`Transport <config:openlineage__transport>` or :ref:`Config
Path <config:openlineage__config_path>`. See "Transport setup" in :ref:`the
configuration section <configuration:openlineage>` and "Configuration
precedence".
+ - Test with ``ConsoleTransport`` to rule out backend/network issues.
+ - Verify network connectivity, auth configuration, and endpoint values.
+
+Inputs/Outputs missing
+
+ - Review :ref:`extraction_precedence:openlineage` and ensure either custom
Extractor or Operator OpenLineage methods are implemented.
+ - For methods, follow best practices: import OpenLineage-related objects
inside the OpenLineage methods, not at module top level; avoid heavy work in
``execute`` that you need in ``_on_start``.
+ - For SQL-like operators, ensure relevant job IDs or runtime metadata are
available to enrich lineage in ``_on_complete``.
+
+Custom Extractor not working
+
+ - Confirm it's listed under :ref:`Extractors
<config:openlineage__extractors>` (or env var equivalent) and importable from
both Scheduler and Workers.
+ - Avoid cyclical imports: import from Airflow only within
``_execute_extraction``/``extract_on_complete``/``extract_on_failure``, and
guard type-only imports with ``typing.TYPE_CHECKING``.
+ - Unit test the Extractor to validate ``OperatorLineage`` contents; mock
external calls. See example tests referenced in
:ref:`custom_extractors:openlineage`.
+
+Custom Run Facets not present
+
+ - Register functions via :ref:`Custom Run Facets
<config:openlineage__custom_run_facets>`.
+ - Function signature must accept ``TaskInstance`` and ``TaskInstanceState``
and return ``dict[str, RunFacet]`` or ``None``.
+ - Avoid duplicate facet keys across functions; duplicates lead to
non-deterministic selection.
+ - Functions execute on START and COMPLETE/FAIL.
+
+Spark jobs missing parent linkage or transport settings
+
+ - If any ``spark.openlineage.parent*`` or ``spark.openlineage.transport*``
properties are explicitly set in the Spark job config, the integration will not
override them.
+ - If supported by your Operator, enable
:ref:`options:spark_inject_parent_job_info` and
:ref:`options:spark_inject_transport_info`
+
+Very large event payloads or serialization failures
+
+ - If :ref:`Include Full Task Info
<config:openlineage__include_full_task_info>` is enabled, events may become
large; consider disabling or trimming task parameters.
+ - :ref:`Disable Source Code <config:openlineage__disable_source_code>` can
reduce payloads for Python/Bash operators that include source code by default.
+
+
+6. Check for open bugs and issues in the provider and the client
+----------------------------------------------------------------
+
+Check for open bugs and issues in the provider and the client.
+
+- Provider: https://github.com/apache/airflow-provider-openlineage/issues
+- Client: https://github.com/OpenLineage/OpenLineage/issues
+
+
+7. Gather crucial information
+-----------------------------
+- Airflow scheduler logs (with log level set to DEBUG, see Step 2 above)
+- Airflow worker (task) logs (with log level set to DEBUG, see Step 2 above)
+- OpenLineage events (with :ref:`Debug Mode <config:openlineage__debug_mode>`
enabled)
+- Airflow version, OpenLineage provider version and OpenLineage client version
+- Details on any custom deployment/environment modifications
+
+
+8. Open an issue on the provider repository
+-------------------------------------------
+
+If you are still facing the issue, please open an issue on the provider
repository and include all the information **gathered in the previous step**
together with a simple example on how to reproduce the issue. Do not paste your
entire codebase, try to come up with a simple code that will demonstrate the
problem - this increase chances of bug getting fixed quickly.
+
+
+9. Ask for help on the OpenLineage slack channel
+------------------------------------------------
+
+If all else fails, you can try asking for help on the OpenLineage `slack
channel <http://bit.ly/OpenLineageSlack>`_ (but remember that it is a community
channel and not a support channel, and people are volunteering their time to
help you).
diff --git a/providers/openlineage/provider.yaml
b/providers/openlineage/provider.yaml
index 157a6e5e4ec..0c9d5b98b40 100644
--- a/providers/openlineage/provider.yaml
+++ b/providers/openlineage/provider.yaml
@@ -19,7 +19,8 @@
package-name: apache-airflow-providers-openlineage
name: OpenLineage Airflow
description: |
- `OpenLineage <https://openlineage.io/>`__
+ `OpenLineage <https://openlineage.io/docs/>`__ is an open framework for data
lineage collection.
+ At its core it is an extensible specification that systems can use to
interoperate with lineage metadata.
state: ready
source-date-epoch: 1768335429
@@ -92,49 +93,16 @@ config:
openlineage:
description: |
This section applies settings for OpenLineage integration.
- More about configuration and its precedence can be found in the `user's
guide
-
<https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/user.html#transport-setup>`_.
options:
- disabled:
+ config_path:
description: |
- Disable sending events without uninstalling the OpenLineage Provider
by setting this to true.
- type: boolean
- example: ~
- default: "False"
+ Specify the path to the YAML configuration file.
+ This ensures backwards compatibility with passing config through the
`openlineage.yml` file.
version_added: ~
- disabled_for_operators:
- description: |
- Exclude some Operators from emitting OpenLineage events by passing a
string of semicolon separated
- full import paths of Operators to disable.
type: string
- example: "airflow.providers.standard.operators.bash.BashOperator;
- airflow.providers.standard.operators.python.PythonOperator"
+ example: "full/path/to/openlineage.yml"
default: ""
- version_added: 1.1.0
- selective_enable:
- description: |
- If this setting is enabled, OpenLineage integration won't collect
and emit metadata,
- unless you explicitly enable it per `DAG` or `Task` using
`enable_lineage` method.
- type: boolean
- default: "False"
- example: ~
- version_added: 1.7.0
- namespace:
- description: |
- Set namespace that the lineage data belongs to, so that if you use
multiple OpenLineage producers,
- events coming from them will be logically separated.
- version_added: ~
- type: string
- example: "my_airflow_instance_1"
- default: ~
- extractors:
- description: |
- Register custom OpenLineage Extractors by passing a string of
semicolon separated full import paths.
- type: string
- example: full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
- default: ~
- version_added: ~
custom_run_facets:
description: |
Register custom run facet functions by passing a string of semicolon
separated full import paths.
@@ -142,32 +110,22 @@ config:
example:
full.path.to.custom_facet_function;full.path.to.another_custom_facet_function
default: ''
version_added: 1.10.0
- config_path:
+ dag_state_change_process_pool_size:
description: |
- Specify the path to the YAML configuration file.
- This ensures backwards compatibility with passing config through the
`openlineage.yml` file.
- version_added: ~
- type: string
- example: "full/path/to/openlineage.yml"
- default: ""
- transport:
+ Number of processes to utilize for processing DAG state changes
+ in an asynchronous manner within the scheduler process.
+ default: "1"
+ example: ~
+ type: integer
+ version_added: 1.8.0
+ debug_mode:
description: |
- Pass OpenLineage Client transport configuration as a JSON string,
including the transport type
- and any additional options specific to that type, as described in
`OpenLineage docs
-
<https://openlineage.io/docs/client/python/#built-in-transport-types>`_.
-
- Currently supported types are:
-
- * HTTP
- * Kafka
- * Console
- * File
- * Composite
- * Custom
- type: string
- example: '{"type": "http", "url": "http://localhost:5000", "endpoint":
"api/v1/lineage"}'
- default: ""
- version_added: ~
+ If true, OpenLineage events will include information useful for
debugging - potentially
+ containing large fields e.g. all installed packages and their
versions.
+ default: "False"
+ example: ~
+ type: boolean
+ version_added: 1.11.0
disable_source_code:
description: |
Disable the inclusion of source code in OpenLineage events by
setting this to `true`.
@@ -177,18 +135,26 @@ config:
example: ~
type: boolean
version_added: ~
- dag_state_change_process_pool_size:
+ disabled:
description: |
- Number of processes to utilize for processing DAG state changes
- in an asynchronous manner within the scheduler process.
- default: "1"
+ Disable sending events without uninstalling the OpenLineage Provider
by setting this to true.
+ type: boolean
example: ~
- type: integer
- version_added: 1.8.0
+ default: "False"
+ version_added: ~
+ disabled_for_operators:
+ description: |
+ Exclude some Operators from emitting OpenLineage events by passing a
string of semicolon separated
+ full import paths of Operators to disable.
+ type: string
+ example: "airflow.providers.standard.operators.bash.BashOperator;
+ airflow.providers.standard.operators.python.PythonOperator"
+ default: ""
+ version_added: 1.1.0
execution_timeout:
description: |
- Maximum amount of time (in seconds) that OpenLineage can spend
executing metadata extraction.
- Note that other configurations, sometimes with higher priority, such
as
+ Maximum amount of time (in seconds) that OpenLineage can spend
executing metadata extraction for
+ task (on worker). Note that other configurations, sometimes with
higher priority, such as
`[core] task_success_overtime
<https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#task-success-overtime>`_,
may also affect how much time OpenLineage has for execution.
@@ -196,21 +162,42 @@ config:
example: ~
type: integer
version_added: 1.9.0
+ extractors:
+ description: |
+ Register custom OpenLineage Extractors by passing a string of
semicolon separated full import paths.
+ type: string
+ example: full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
+ default: ~
+ version_added: ~
include_full_task_info:
description: |
- If true, OpenLineage event will include full task info - potentially
containing large fields.
+ If true, OpenLineage task events include full serialized task
(operator) information.
+ By default, the AirflowRunFacet attached to task events contains
only a selected subset
+ of task parameters. With this flag on, all serializable task
parameters are sent
+ (excluding known non-serializable elements), which may significantly
increase event size.
+
+ Warning: By setting this variable to true, OpenLineage event can
potentially include elements that
+ are megabytes in size or larger, depending on the size of data you
pass to the task.
default: "False"
example: ~
type: boolean
version_added: 1.10.0
- debug_mode:
+ namespace:
description: |
- If true, OpenLineage events will include information useful for
debugging - potentially
- containing large fields e.g. all installed packages and their
versions.
+ Set namespace that the lineage data belongs to, so that if you use
multiple OpenLineage producers,
+ events coming from them will be logically separated.
+ version_added: ~
+ type: string
+ example: "my_airflow_instance_1"
+ default: ~
+ selective_enable:
+ description: |
+ If this setting is enabled, OpenLineage integration won't collect
and emit metadata,
+ unless you explicitly enable it per `DAG` or `Task` using
`enable_lineage` method.
+ type: boolean
default: "False"
example: ~
- type: boolean
- version_added: 1.11.0
+ version_added: 1.7.0
spark_inject_parent_job_info:
description: |
Automatically inject OpenLineage's parent job (namespace, job name,
run id) information into Spark
@@ -227,3 +214,12 @@ config:
default: "False"
example: ~
version_added: 2.1.0
+ transport:
+ description: |
+ Pass OpenLineage Client transport configuration as a JSON string,
including the transport type
+ and any additional options specific to that type, as described in
`OpenLineage docs
+
<https://openlineage.io/docs/client/python/configuration#transports>`_.
+ type: string
+ example: '{"type": "http", "url": "http://localhost:5000", "endpoint":
"api/v1/lineage"}'
+ default: ""
+ version_added: ~
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py
b/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py
index 3a26a47bdfd..edd960888ee 100644
---
a/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py
+++
b/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py
@@ -25,7 +25,7 @@ def get_provider_info():
return {
"package-name": "apache-airflow-providers-openlineage",
"name": "OpenLineage Airflow",
- "description": "`OpenLineage <https://openlineage.io/>`__\n",
+ "description": "`OpenLineage <https://openlineage.io/docs/>`__ is an
open framework for data lineage collection.\nAt its core it is an extensible
specification that systems can use to interoperate with lineage metadata.\n",
"integrations": [
{
"integration-name": "OpenLineage",
@@ -48,42 +48,14 @@ def get_provider_info():
],
"config": {
"openlineage": {
- "description": "This section applies settings for OpenLineage
integration.\nMore about configuration and its precedence can be found in the
`user's
guide\n<https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/user.html#transport-setup>`_.\n",
+ "description": "This section applies settings for OpenLineage
integration.\n",
"options": {
- "disabled": {
- "description": "Disable sending events without
uninstalling the OpenLineage Provider by setting this to true.\n",
- "type": "boolean",
- "example": None,
- "default": "False",
+ "config_path": {
+ "description": "Specify the path to the YAML
configuration file.\nThis ensures backwards compatibility with passing config
through the `openlineage.yml` file.\n",
"version_added": None,
- },
- "disabled_for_operators": {
- "description": "Exclude some Operators from emitting
OpenLineage events by passing a string of semicolon separated\nfull import
paths of Operators to disable.\n",
"type": "string",
- "example":
"airflow.providers.standard.operators.bash.BashOperator;
airflow.providers.standard.operators.python.PythonOperator",
+ "example": "full/path/to/openlineage.yml",
"default": "",
- "version_added": "1.1.0",
- },
- "selective_enable": {
- "description": "If this setting is enabled,
OpenLineage integration won't collect and emit metadata,\nunless you explicitly
enable it per `DAG` or `Task` using `enable_lineage` method.\n",
- "type": "boolean",
- "default": "False",
- "example": None,
- "version_added": "1.7.0",
- },
- "namespace": {
- "description": "Set namespace that the lineage data
belongs to, so that if you use multiple OpenLineage producers,\nevents coming
from them will be logically separated.\n",
- "version_added": None,
- "type": "string",
- "example": "my_airflow_instance_1",
- "default": None,
- },
- "extractors": {
- "description": "Register custom OpenLineage Extractors
by passing a string of semicolon separated full import paths.\n",
- "type": "string",
- "example":
"full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass",
- "default": None,
- "version_added": None,
},
"custom_run_facets": {
"description": "Register custom run facet functions by
passing a string of semicolon separated full import paths.\n",
@@ -92,19 +64,19 @@ def get_provider_info():
"default": "",
"version_added": "1.10.0",
},
- "config_path": {
- "description": "Specify the path to the YAML
configuration file.\nThis ensures backwards compatibility with passing config
through the `openlineage.yml` file.\n",
- "version_added": None,
- "type": "string",
- "example": "full/path/to/openlineage.yml",
- "default": "",
+ "dag_state_change_process_pool_size": {
+ "description": "Number of processes to utilize for
processing DAG state changes\nin an asynchronous manner within the scheduler
process.\n",
+ "default": "1",
+ "example": None,
+ "type": "integer",
+ "version_added": "1.8.0",
},
- "transport": {
- "description": "Pass OpenLineage Client transport
configuration as a JSON string, including the transport type\nand any
additional options specific to that type, as described in `OpenLineage
docs\n<https://openlineage.io/docs/client/python/#built-in-transport-types>`_.\n\nCurrently
supported types are:\n\n * HTTP\n * Kafka\n * Console\n * File\n *
Composite\n * Custom\n",
- "type": "string",
- "example": '{"type": "http", "url":
"http://localhost:5000", "endpoint": "api/v1/lineage"}',
- "default": "",
- "version_added": None,
+ "debug_mode": {
+ "description": "If true, OpenLineage events will
include information useful for debugging - potentially\ncontaining large fields
e.g. all installed packages and their versions.\n",
+ "default": "False",
+ "example": None,
+ "type": "boolean",
+ "version_added": "1.11.0",
},
"disable_source_code": {
"description": "Disable the inclusion of source code
in OpenLineage events by setting this to `true`.\nBy default, several Operators
(e.g. Python, Bash) will include their source code in the events\nunless
disabled.\n",
@@ -113,33 +85,54 @@ def get_provider_info():
"type": "boolean",
"version_added": None,
},
- "dag_state_change_process_pool_size": {
- "description": "Number of processes to utilize for
processing DAG state changes\nin an asynchronous manner within the scheduler
process.\n",
- "default": "1",
+ "disabled": {
+ "description": "Disable sending events without
uninstalling the OpenLineage Provider by setting this to true.\n",
+ "type": "boolean",
"example": None,
- "type": "integer",
- "version_added": "1.8.0",
+ "default": "False",
+ "version_added": None,
+ },
+ "disabled_for_operators": {
+ "description": "Exclude some Operators from emitting
OpenLineage events by passing a string of semicolon separated\nfull import
paths of Operators to disable.\n",
+ "type": "string",
+ "example":
"airflow.providers.standard.operators.bash.BashOperator;
airflow.providers.standard.operators.python.PythonOperator",
+ "default": "",
+ "version_added": "1.1.0",
},
"execution_timeout": {
- "description": "Maximum amount of time (in seconds)
that OpenLineage can spend executing metadata extraction.\nNote that other
configurations, sometimes with higher priority, such as\n`[core]
task_success_overtime\n<https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#task-success-overtime>`_,\nmay
also affect how much time OpenLineage has for execution.\n",
+ "description": "Maximum amount of time (in seconds)
that OpenLineage can spend executing metadata extraction for\ntask (on worker).
Note that other configurations, sometimes with higher priority, such
as\n`[core]
task_success_overtime\n<https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#task-success-overtime>`_,\nmay
also affect how much time OpenLineage has for execution.\n",
"default": "10",
"example": None,
"type": "integer",
"version_added": "1.9.0",
},
+ "extractors": {
+ "description": "Register custom OpenLineage Extractors
by passing a string of semicolon separated full import paths.\n",
+ "type": "string",
+ "example":
"full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass",
+ "default": None,
+ "version_added": None,
+ },
"include_full_task_info": {
- "description": "If true, OpenLineage event will
include full task info - potentially containing large fields.\n",
+ "description": "If true, OpenLineage task events
include full serialized task (operator) information.\nBy default, the
AirflowRunFacet attached to task events contains only a selected subset\nof
task parameters. With this flag on, all serializable task parameters are
sent\n(excluding known non-serializable elements), which may significantly
increase event size.\n\nWarning: By setting this variable to true, OpenLineage
event can potentially include elements that\na [...]
"default": "False",
"example": None,
"type": "boolean",
"version_added": "1.10.0",
},
- "debug_mode": {
- "description": "If true, OpenLineage events will
include information useful for debugging - potentially\ncontaining large fields
e.g. all installed packages and their versions.\n",
+ "namespace": {
+ "description": "Set namespace that the lineage data
belongs to, so that if you use multiple OpenLineage producers,\nevents coming
from them will be logically separated.\n",
+ "version_added": None,
+ "type": "string",
+ "example": "my_airflow_instance_1",
+ "default": None,
+ },
+ "selective_enable": {
+ "description": "If this setting is enabled,
OpenLineage integration won't collect and emit metadata,\nunless you explicitly
enable it per `DAG` or `Task` using `enable_lineage` method.\n",
+ "type": "boolean",
"default": "False",
"example": None,
- "type": "boolean",
- "version_added": "1.11.0",
+ "version_added": "1.7.0",
},
"spark_inject_parent_job_info": {
"description": "Automatically inject OpenLineage's
parent job (namespace, job name, run id) information into Spark\napplication
properties for supported Operators.\n",
@@ -155,6 +148,13 @@ def get_provider_info():
"example": None,
"version_added": "2.1.0",
},
+ "transport": {
+ "description": "Pass OpenLineage Client transport
configuration as a JSON string, including the transport type\nand any
additional options specific to that type, as described in `OpenLineage
docs\n<https://openlineage.io/docs/client/python/configuration#transports>`_.\n",
+ "type": "string",
+ "example": '{"type": "http", "url":
"http://localhost:5000", "endpoint": "api/v1/lineage"}',
+ "default": "",
+ "version_added": None,
+ },
},
}
},