This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 37a576130b openlineage: add config to include 'full' task info based
on conf setting (#40589)
37a576130b is described below
commit 37a576130baf2ffafb597195802522e40f61c339
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Tue Jul 16 11:06:22 2024 +0200
openlineage: add config to include 'full' task info based on conf setting
(#40589)
* openlineage: add config to include 'full' task info based on conf setting
Signed-off-by: Maciej Obuchowski <[email protected]>
* Update docs/apache-airflow-providers-openlineage/guides/user.rst
Co-authored-by: Jed Cunningham
<[email protected]>
* Update docs/apache-airflow-providers-openlineage/guides/user.rst
Co-authored-by: Jed Cunningham
<[email protected]>
* Update docs/apache-airflow-providers-openlineage/guides/user.rst
Co-authored-by: Jed Cunningham
<[email protected]>
---------
Signed-off-by: Maciej Obuchowski <[email protected]>
Co-authored-by: Jed Cunningham
<[email protected]>
---
airflow/providers/openlineage/conf.py | 5 +++
airflow/providers/openlineage/provider.yaml | 7 ++++
airflow/providers/openlineage/utils/utils.py | 21 +++++++++++-
.../guides/user.rst | 22 ++++++++++++
tests/providers/openlineage/plugins/test_utils.py | 39 +++++++++++++++++++++-
tests/providers/openlineage/test_conf.py | 33 ++++++++++++++++++
6 files changed, 125 insertions(+), 2 deletions(-)
diff --git a/airflow/providers/openlineage/conf.py
b/airflow/providers/openlineage/conf.py
index 8f3c2b3571..0e4af0a70b 100644
--- a/airflow/providers/openlineage/conf.py
+++ b/airflow/providers/openlineage/conf.py
@@ -145,3 +145,8 @@ def execution_timeout() -> int:
"""[openlineage] execution_timeout."""
option = conf.get(_CONFIG_SECTION, "execution_timeout", fallback="")
return _safe_int_convert(str(option).strip(), default=10)
+
+
+@cache
+def include_full_task_info() -> bool:
+ return conf.getboolean(_CONFIG_SECTION, "include_full_task_info",
fallback="False")
diff --git a/airflow/providers/openlineage/provider.yaml
b/airflow/providers/openlineage/provider.yaml
index 9622226c7f..cfa001fdba 100644
--- a/airflow/providers/openlineage/provider.yaml
+++ b/airflow/providers/openlineage/provider.yaml
@@ -153,3 +153,10 @@ config:
example: ~
type: integer
version_added: 1.9.0
+ include_full_task_info:
+ description: |
+ If true, OpenLineage event will include full task info - potentially
containing large fields.
+ default: "False"
+ example: ~
+ type: boolean
+ version_added: 1.10.0
diff --git a/airflow/providers/openlineage/utils/utils.py
b/airflow/providers/openlineage/utils/utils.py
index 4bc2356206..0484d11f53 100644
--- a/airflow/providers/openlineage/utils/utils.py
+++ b/airflow/providers/openlineage/utils/utils.py
@@ -272,6 +272,25 @@ class TaskInfo(InfoJsonEncodable):
}
+class TaskInfoComplete(TaskInfo):
+ """Defines encoding BaseOperator/AbstractOperator object to JSON used when
user enables full task info."""
+
+ includes = []
+ excludes = [
+ "_BaseOperator__instantiated",
+ "_dag",
+ "_hook",
+ "_log",
+ "_outlets",
+ "_inlets",
+ "_lock_for_execution",
+ "handler",
+ "params",
+ "python_callable",
+ "retry_delay",
+ ]
+
+
class TaskGroupInfo(InfoJsonEncodable):
"""Defines encoding TaskGroup object to JSON."""
@@ -300,7 +319,7 @@ def get_airflow_run_facet(
dag=DagInfo(dag),
dagRun=DagRunInfo(dag_run),
taskInstance=TaskInstanceInfo(task_instance),
- task=TaskInfo(task),
+ task=TaskInfoComplete(task) if conf.include_full_task_info() else
TaskInfo(task),
taskUuid=task_uuid,
)
}
diff --git a/docs/apache-airflow-providers-openlineage/guides/user.rst
b/docs/apache-airflow-providers-openlineage/guides/user.rst
index e7decec1f1..437da6d0fa 100644
--- a/docs/apache-airflow-providers-openlineage/guides/user.rst
+++ b/docs/apache-airflow-providers-openlineage/guides/user.rst
@@ -246,6 +246,28 @@ full import paths of Airflow Operators to disable as
``disabled_for_operators``
AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS='airflow.operators.bash.BashOperator;airflow.operators.python.PythonOperator'
+Full Task Info
+^^^^^^^^^^^^^^
+
+By default, OpenLineage integration's AirflowRunFacet - attached on START
event for every task instance event - does
+not contain full serialized task information (parameters to given operator),
but only includes select parameters.
+
+However, we allow users to set OpenLineage integration to include full task
information. By doing this, rather than
+serializing only a few known attributes, we exclude certain non-serializable
elements and send everything else.
+
+.. code-block:: ini
+
+ [openlineage]
+ transport = {"type": "http", "url": "http://example.com:5000", "endpoint":
"api/v1/lineage"}
+ include_full_task_info = true
+
+``AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO`` environment variable is an
equivalent.
+
+.. warning::
+
+ By setting this variable to true, OpenLineage integration does not control
the size of event you sent. It can potentially include elements that are
megabytes in size or larger, depending on the size of data you pass to the task.
+
+
Custom Extractors
^^^^^^^^^^^^^^^^^
diff --git a/tests/providers/openlineage/plugins/test_utils.py
b/tests/providers/openlineage/plugins/test_utils.py
index 016bb99d4e..8ca245d1f3 100644
--- a/tests/providers/openlineage/plugins/test_utils.py
+++ b/tests/providers/openlineage/plugins/test_utils.py
@@ -21,7 +21,7 @@ import json
import uuid
from json import JSONEncoder
from typing import Any
-from unittest.mock import patch
+from unittest.mock import MagicMock, patch
import pytest
from attrs import define
@@ -34,6 +34,7 @@ from airflow.providers.openlineage.utils.utils import (
InfoJsonEncodable,
OpenLineageRedactor,
_is_name_redactable,
+ get_airflow_run_facet,
get_fully_qualified_class_name,
is_operator_disabled,
)
@@ -227,3 +228,39 @@ def test_is_operator_disabled(mock_disabled_operators):
"airflow.operators.python.PythonOperator",
}
assert is_operator_disabled(op) is True
+
+
+@patch("airflow.providers.openlineage.conf.include_full_task_info")
+def test_includes_full_task_info(mock_include_full_task_info):
+ mock_include_full_task_info.return_value = True
+ # There should be no 'bash_command' in excludes and it's not in includes -
so
+ # it's a good choice for checking TaskInfo vs TaskInfoComplete
+ assert (
+ "bash_command"
+ in get_airflow_run_facet(
+ MagicMock(),
+ MagicMock(),
+ MagicMock(),
+ BashOperator(task_id="bash_op", bash_command="sleep 1"),
+ MagicMock(),
+ )["airflow"].task
+ )
+
+
+@patch("airflow.providers.openlineage.conf.include_full_task_info")
+def test_does_not_include_full_task_info(mock_include_full_task_info):
+ from airflow.operators.bash import BashOperator
+
+ mock_include_full_task_info.return_value = False
+ # There should be no 'bash_command' in excludes and it's not in includes -
so
+ # it's a good choice for checking TaskInfo vs TaskInfoComplete
+ assert (
+ "bash_command"
+ not in get_airflow_run_facet(
+ MagicMock(),
+ MagicMock(),
+ MagicMock(),
+ BashOperator(task_id="bash_op", bash_command="sleep 1"),
+ MagicMock(),
+ )["airflow"].task
+ )
diff --git a/tests/providers/openlineage/test_conf.py
b/tests/providers/openlineage/test_conf.py
index 60060b001c..7e0a1c85a7 100644
--- a/tests/providers/openlineage/test_conf.py
+++ b/tests/providers/openlineage/test_conf.py
@@ -21,6 +21,7 @@ from unittest import mock
import pytest
+from airflow.exceptions import AirflowConfigException
from airflow.providers.openlineage.conf import (
_is_true,
_safe_int_convert,
@@ -28,6 +29,7 @@ from airflow.providers.openlineage.conf import (
custom_extractors,
dag_state_change_process_pool_size,
disabled_operators,
+ include_full_task_info,
is_disabled,
is_source_enabled,
namespace,
@@ -52,6 +54,7 @@ _CONFIG_OPTION_DISABLED = "disabled"
_VAR_URL = "OPENLINEAGE_URL"
_CONFIG_OPTION_SELECTIVE_ENABLE = "selective_enable"
_CONFIG_OPTION_DAG_STATE_CHANGE_PROCESS_POOL_SIZE =
"dag_state_change_process_pool_size"
+_CONFIG_OPTION_INCLUDE_FULL_TASK_INFO = "include_full_task_info"
_BOOL_PARAMS = (
("1", True),
@@ -487,3 +490,33 @@ def test_dag_state_change_process_pool_size(var_string,
expected):
with conf_vars({(_CONFIG_SECTION,
_CONFIG_OPTION_DAG_STATE_CHANGE_PROCESS_POOL_SIZE): var_string}):
result = dag_state_change_process_pool_size()
assert result == expected
+
+
[email protected](
+ ("var", "expected"),
+ (
+ ("False", False),
+ ("True", True),
+ ("t", True),
+ ("true", True),
+ ),
+)
+def test_include_full_task_info_reads_config(var, expected):
+ with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_INCLUDE_FULL_TASK_INFO):
var}):
+ assert include_full_task_info() is expected
+
+
[email protected](
+ "var",
+ [
+ "a",
+ "asdf",
+ "31",
+ "",
+ " ",
+ ],
+)
+def test_include_full_task_info_raises_exception(var):
+ with conf_vars({(_CONFIG_SECTION, _CONFIG_OPTION_INCLUDE_FULL_TASK_INFO):
var}):
+ with pytest.raises(AirflowConfigException):
+ include_full_task_info()