This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f0e3e612ba feat: Add openlineage docs ext with list of supported
operators and hooks. (#36311)
f0e3e612ba is described below
commit f0e3e612ba96d86e8122d702e1b51a46ecbd414c
Author: Kacper Muda <[email protected]>
AuthorDate: Wed Jan 3 15:01:20 2024 +0100
feat: Add openlineage docs ext with list of supported operators and hooks.
(#36311)
---
.../apache-airflow-providers-openlineage/index.rst | 1 +
.../supported_classes.rst | 28 +++
docs/conf.py | 1 +
docs/exts/providers_extensions.py | 188 +++++++++++++++++++++
docs/exts/templates/openlineage.rst.jinja2 | 51 ++++++
5 files changed, 269 insertions(+)
diff --git a/docs/apache-airflow-providers-openlineage/index.rst
b/docs/apache-airflow-providers-openlineage/index.rst
index cdd7c7ae9c..64ccaf583d 100644
--- a/docs/apache-airflow-providers-openlineage/index.rst
+++ b/docs/apache-airflow-providers-openlineage/index.rst
@@ -72,6 +72,7 @@
PyPI Repository
<https://pypi.org/project/apache-airflow-providers-openlineage/>
Installing from sources <installing-providers-from-sources>
+ Supported operators <supported_classes.rst>
.. THE REMAINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE
OVERWRITTEN AT RELEASE TIME!
diff --git a/docs/apache-airflow-providers-openlineage/supported_classes.rst
b/docs/apache-airflow-providers-openlineage/supported_classes.rst
new file mode 100644
index 0000000000..81266929cf
--- /dev/null
+++ b/docs/apache-airflow-providers-openlineage/supported_classes.rst
@@ -0,0 +1,28 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+Supported operators
+===================
+
+Below is a list of operators that support OpenLineage extraction,
+along with specific DB types that are compatible with the
SQLExecuteQueryOperator.
+
+**Disclaimer:** While we strive to keep the list of supported operators
current,
+please be aware that our updating process is automated and may not always
capture everything accurately.
+
+.. airflow-providers-openlineage-supported-classes::
diff --git a/docs/conf.py b/docs/conf.py
index f66e016ee9..053812b6dc 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -185,6 +185,7 @@ elif PACKAGE_NAME.startswith("apache-airflow-providers-"):
[
"extra_provider_files_with_substitutions",
"autoapi.extension",
+ "providers_extensions",
]
)
else:
diff --git a/docs/exts/providers_extensions.py
b/docs/exts/providers_extensions.py
new file mode 100644
index 0000000000..2a564a9ddc
--- /dev/null
+++ b/docs/exts/providers_extensions.py
@@ -0,0 +1,188 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Module for provider's custom Sphinx extensions that will be loaded in all
providers' documentation."""
+from __future__ import annotations
+
+import ast
+import os
+from pathlib import Path
+from typing import Any, Iterable
+
+import yaml
+
+# No stub exists for docutils.parsers.rst.directives. See
https://github.com/python/typeshed/issues/5755.
+from provider_yaml_utils import get_provider_yaml_paths
+
+from docs.exts.operators_and_hooks_ref import (
+ DEFAULT_HEADER_SEPARATOR,
+ BaseJinjaReferenceDirective,
+ _render_template,
+)
+
+
+def _get_module_class_registry(
+ module_filepath: str, class_extras: dict[str, Any]
+) -> dict[str, dict[str, Any]]:
+ """Extracts classes and its information from a Python module file.
+
+ The function parses the specified module file and registers all classes.
+ The registry for each class includes the module filename, methods, base
classes
+ and any additional class extras provided.
+
+ :param module_filepath: The file path of the module.
+ :param class_extras: Additional information to include in each class's
registry.
+
+ :return: A dictionary with class names as keys and their corresponding
information.
+ """
+ with open(module_filepath) as file:
+ ast_obj = ast.parse(file.read())
+
+ module_class_registry = {
+ node.name: {
+ "module_filepath": module_filepath,
+ "methods": {n.name for n in ast.walk(node) if isinstance(n,
ast.FunctionDef)},
+ "base_classes": [b.id for b in node.bases if isinstance(b,
ast.Name)],
+ **class_extras,
+ }
+ for node in ast_obj.body
+ if isinstance(node, ast.ClassDef)
+ }
+ return module_class_registry
+
+
+def _has_method(
+ class_name: str, method_names: Iterable[str], class_registry: dict[str,
dict[str, Any]]
+) -> bool:
+ """Determines if a class or its bases in the registry have any of the
specified methods.
+
+ :param class_name: The name of the class to check.
+ :param method_names: A list of names of methods to search for.
+ :param class_registry: A dictionary representing the class registry, where
each key is a class name
+ and the value is its metadata.
+ :return: True if any of the specified methods are found in the class or
its base classes; False otherwise.
+
+ Example:
+ >>> example_class_registry = {
+ ... "MyClass": {"methods": {"foo", "bar"}, "base_classes":
["BaseClass"]},
+ ... "BaseClass": {"methods": {"base_foo"}, "base_classes": []},
+ ... }
+ >>> _has_method("MyClass", ["foo"], example_class_registry)
+ True
+ >>> _has_method("MyClass", ["base_foo"], example_class_registry)
+ True
+ >>> _has_method("MyClass", ["not_a_method"], example_class_registry)
+ False
+ """
+ if class_name in class_registry:
+ if any(method in class_registry[class_name]["methods"] for method in
method_names):
+ return True
+ for base_name in class_registry[class_name]["base_classes"]:
+ if _has_method(base_name, method_names, class_registry):
+ return True
+ return False
+
+
+def _get_providers_class_registry() -> dict[str, dict[str, Any]]:
+ """Builds a registry of classes from YAML configuration files.
+
+ This function scans through YAML configuration files to build a registry
of classes.
+ It parses each YAML file to get the provider's name and registers classes
from Python
+ module files within the provider's directory, excluding '__init__.py'.
+
+ :return: A dictionary with provider names as keys and a dictionary of
classes as values.
+ """
+ class_registry = {}
+ for provider_yaml_path in get_provider_yaml_paths():
+ provider_yaml_content =
yaml.safe_load(Path(provider_yaml_path).read_text())
+ for root, _, file_names in os.walk(Path(provider_yaml_path).parent):
+ for file_name in file_names:
+ module_filepath = f"{os.path.relpath(root)}/{file_name}"
+ if not module_filepath.endswith(".py") or module_filepath ==
"__init__.py":
+ continue
+
+ module_registry = _get_module_class_registry(
+ module_filepath=module_filepath,
+ class_extras={"provider_name":
provider_yaml_content["package-name"]},
+ )
+ class_registry.update(module_registry)
+
+ return class_registry
+
+
+def _render_openlineage_supported_classes_content():
+ openlineage_operator_methods = ("get_openlineage_facets_on_complete",
"get_openlineage_facets_on_start")
+ openlineage_db_hook_methods = (
+ "get_openlineage_database_info",
+ "get_openlineage_database_specific_lineage",
+ )
+
+ class_registry = _get_providers_class_registry()
+ # These excluded classes will be included in docs directly
+ class_registry.pop("DbApiHook")
+ class_registry.pop("SQLExecuteQueryOperator")
+
+ providers: dict[str, dict[str, list[str]]] = {}
+ db_hooks: list[tuple[str, str]] = []
+ for class_name, info in class_registry.items():
+ if class_name.startswith("_"):
+ continue
+ module_name = info["module_filepath"].replace("/", ".").replace(".py",
"").lstrip(".")
+ class_path = f"{module_name}.{class_name}"
+ provider_entry = providers.setdefault(info["provider_name"],
{"operators": []})
+
+ if class_name.lower().endswith("operator"):
+ if _has_method(
+ class_name=class_name,
+ method_names=openlineage_operator_methods,
+ class_registry=class_registry,
+ ):
+ provider_entry["operators"].append(class_path)
+ elif class_name.lower().endswith("hook"):
+ if _has_method(
+ class_name=class_name,
+ method_names=openlineage_db_hook_methods,
+ class_registry=class_registry,
+ ):
+ db_type = class_name.replace("SqlApiHook", "").replace("Hook",
"")
+ db_hooks.append((db_type, class_path))
+
+ providers = {
+ provider: {key: sorted(set(value), key=lambda x: x.split(".")[-1]) for
key, value in details.items()}
+ for provider, details in sorted(providers.items())
+ if any(details.values())
+ }
+ db_hooks = sorted({db_type: hook for db_type, hook in db_hooks}.items(),
key=lambda x: x[0])
+
+ return _render_template(
+ "openlineage.rst.jinja2",
+ providers=providers,
+ db_hooks=db_hooks,
+ )
+
+
+class OpenLineageSupportedClassesDirective(BaseJinjaReferenceDirective):
+ """Generate list of classes supporting OpenLineage"""
+
+ def render_content(self, *, tags: set[str] | None, header_separator: str =
DEFAULT_HEADER_SEPARATOR):
+ return _render_openlineage_supported_classes_content()
+
+
+def setup(app):
+ """Setup plugin"""
+ app.add_directive("airflow-providers-openlineage-supported-classes",
OpenLineageSupportedClassesDirective)
+
+ return {"parallel_read_safe": True, "parallel_write_safe": True}
diff --git a/docs/exts/templates/openlineage.rst.jinja2
b/docs/exts/templates/openlineage.rst.jinja2
new file mode 100644
index 0000000000..7dffc175f8
--- /dev/null
+++ b/docs/exts/templates/openlineage.rst.jinja2
@@ -0,0 +1,51 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+#}
+Core operators
+==============
+At the moment, two core operators supports OpenLineage. These operators
function as a 'black box,'
+capable of running any code, which might limit the extent of lineage
extraction.
+
+- :class:`~airflow.operators.python.PythonOperator` (via
:class:`airflow.providers.openlineage.extractors.python.PythonExtractor`)
+- :class:`~airflow.operators.bash.BashOperator` (via
:class:`airflow.providers.openlineage.extractors.bash.BashExtractor`)
+
+
+:class:`~airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator`
+============================================================================
+
+uses SQL parsing for lineage extraction. To extract unique data from each
database type,
+a dedicated Hook implementing OpenLineage methods is required. Currently, the
following databases are supported:
+
+{% for db_type, hook in db_hooks %}
+- {{ db_type }} (via :class:`~{{ hook }}`)
+{% endfor %}
+
+
+Provider's operators
+====================
+The operators listed below from each provider are natively equipped with
OpenLineage support.
+
+{%for provider_name, provider_dict in providers.items() %}
+{{ provider_name }}
+{{ '"' * (provider_name|length) }}
+
+{% for operator in provider_dict['operators'] %}
+- :class:`~{{ operator }}`
+{% endfor %}
+
+{% endfor %}