This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 469545c1b9c Openlineage: Read HTTP API key auth from Airflow
connection (#66342)
469545c1b9c is described below
commit 469545c1b9c759ab3e49e405612b09a943f0b350
Author: Ulada Zakharava <[email protected]>
AuthorDate: Mon May 25 15:38:52 2026 +0200
Openlineage: Read HTTP API key auth from Airflow connection (#66342)
---
providers/openlineage/docs/configurations-ref.rst | 37 +++-
providers/openlineage/provider.yaml | 9 +
.../src/airflow/providers/openlineage/conf.py | 8 +
.../providers/openlineage/get_provider_info.py | 7 +
.../providers/openlineage/plugins/adapter.py | 31 +++-
.../providers/openlineage/token_provider.py | 133 ++++++++++++++
.../tests/unit/openlineage/plugins/test_adapter.py | 162 ++++++++++++++++-
.../tests/unit/openlineage/test_token_provider.py | 192 +++++++++++++++++++++
8 files changed, 568 insertions(+), 11 deletions(-)
diff --git a/providers/openlineage/docs/configurations-ref.rst
b/providers/openlineage/docs/configurations-ref.rst
index 258c9761d56..fb78ad2424f 100644
--- a/providers/openlineage/docs/configurations-ref.rst
+++ b/providers/openlineage/docs/configurations-ref.rst
@@ -56,6 +56,36 @@ If you want to look at OpenLineage events without sending
them anywhere, you can
[openlineage]
transport = {"type": "console"}
+OpenLineage config stored in an Airflow connection
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+You can store OpenLineage client configuration in a Generic Airflow connection
instead of putting the full JSON
+configuration directly in ``airflow.cfg``. Set ``config_conn_id`` to the
connection ID and store the OpenLineage
+configuration in the connection extra as JSON.
+
+.. code-block:: ini
+
+ [openlineage]
+ config_conn_id = openlineage_default
+
+Connection extra should contain the OpenLineage client configuration:
+
+.. code-block:: json
+
+ {
+ "transport": {
+ "type": "http",
+ "url": "http://example.com:5000",
+ "auth": {"type": "airflow_connection_api_key"}
+ }
+ }
+
+For HTTP transports that require API key authentication, you can keep the
token in the Airflow connection password.
+Set ``auth.type`` to ``airflow_connection_api_key``. When the config is loaded
from ``config_conn_id``, the provider
+reads the API key from the same connection password by default. You can also
set ``auth.conn_id`` to read the token
+from another Airflow connection. The provider resolves
``airflow_connection_api_key`` to standard OpenLineage
+``api_key`` auth before creating the OpenLineage client.
+
.. note::
For full list of built-in transport types, specific transport's options or
instructions on how to implement your custom transport, refer to
`Python client documentation
<https://openlineage.io/docs/client/python/configuration#transports>`_.
@@ -100,9 +130,10 @@ Primary, and recommended method of configuring OpenLineage
Airflow Provider is A
As there are multiple possible ways of configuring OpenLineage, it's important
to keep in mind the precedence of different configurations.
OpenLineage Airflow Provider looks for the configuration in the following
order:
-1. Check ``config_path`` in ``airflow.cfg`` under ``openlineage`` section (or
AIRFLOW__OPENLINEAGE__CONFIG_PATH environment variable)
-2. Check ``transport`` in ``airflow.cfg`` under ``openlineage`` section (or
AIRFLOW__OPENLINEAGE__TRANSPORT environment variable)
-3. If all the above options are missing, the OpenLineage Python client used
underneath looks for configuration in the order described in `this
<https://openlineage.io/docs/client/python/configuration>`_ documentation.
Please note that **using Airflow configuration is encouraged** and is the only
future proof solution.
+1. Check ``config_conn_id`` in ``airflow.cfg`` under ``openlineage`` section.
+2. Check ``config_path`` in ``airflow.cfg`` under ``openlineage`` section (or
AIRFLOW__OPENLINEAGE__CONFIG_PATH environment variable)
+3. Check ``transport`` in ``airflow.cfg`` under ``openlineage`` section (or
AIRFLOW__OPENLINEAGE__TRANSPORT environment variable)
+4. If all the above options are missing, the OpenLineage Python client used
underneath looks for configuration in the order described in `this
<https://openlineage.io/docs/client/python/configuration>`_ documentation.
Please note that **using Airflow configuration is encouraged** and is the only
future proof solution.
.. _configuration_selective_enable:openlineage:
diff --git a/providers/openlineage/provider.yaml
b/providers/openlineage/provider.yaml
index 41489c8d6b4..c8fd3f514b3 100644
--- a/providers/openlineage/provider.yaml
+++ b/providers/openlineage/provider.yaml
@@ -105,6 +105,15 @@ config:
This section applies settings for OpenLineage integration.
options:
+ config_conn_id:
+ description: |
+ Specify a Generic Airflow connection ID that contains OpenLineage
configuration in connection
+ extra. This can be used to keep the OpenLineage transport
configuration, including auth settings,
+ outside of the Airflow configuration file.
+ version_added: ~
+ type: string
+ example: "openlineage_default"
+ default: ""
config_path:
description: |
Specify the path to the YAML configuration file.
diff --git a/providers/openlineage/src/airflow/providers/openlineage/conf.py
b/providers/openlineage/src/airflow/providers/openlineage/conf.py
index 0fc4611e0c6..3a2ca201e53 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/conf.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/conf.py
@@ -54,6 +54,12 @@ def config_path(check_legacy_env_var: bool = True) -> str:
return option
+@cache
+def config_conn_id() -> str:
+ """[openlineage] config_conn_id."""
+ return conf.get(_CONFIG_SECTION, "config_conn_id", fallback="")
+
+
@cache
def is_source_enabled() -> bool:
"""[openlineage] disable_source_code."""
@@ -136,6 +142,8 @@ def is_disabled() -> bool:
if _is_true(os.getenv("OPENLINEAGE_DISABLED", "")): # Check legacy
variable
return True
+ if config_conn_id(): # Check if config connection is present
+ return False
if transport(): # Check if transport is present
return False
if config_path(True): # Check if config file is present
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py
b/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py
index c222b76aa5d..60cf1a981a6 100644
---
a/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py
+++
b/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py
@@ -50,6 +50,13 @@ def get_provider_info():
"openlineage": {
"description": "This section applies settings for OpenLineage
integration.\n",
"options": {
+ "config_conn_id": {
+ "description": "Specify a Generic Airflow connection
ID that contains OpenLineage configuration in connection\nextra. This can be
used to keep the OpenLineage transport configuration, including auth
settings,\noutside of the Airflow configuration file.\n",
+ "version_added": None,
+ "type": "string",
+ "example": "openlineage_default",
+ "default": "",
+ },
"config_path": {
"description": "Specify the path to the YAML
configuration file.\nThis ensures backwards compatibility with passing config
through the `openlineage.yml` file.\n",
"version_added": None,
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
index 594afe12280..ec81e232fc8 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py
@@ -18,7 +18,7 @@ from __future__ import annotations
import os
import traceback
-from typing import TYPE_CHECKING, Literal
+from typing import TYPE_CHECKING, Any, Literal
import yaml
from openlineage.client import OpenLineageClient, set_producer
@@ -36,6 +36,10 @@ from openlineage.client.facet_v2 import (
from airflow.providers.common.compat.sdk import Stats, conf as airflow_conf
from airflow.providers.openlineage import conf
+from airflow.providers.openlineage.token_provider import (
+ AirflowConnectionConfigProvider,
+ resolve_airflow_connection_auth,
+)
from airflow.providers.openlineage.utils.utils import (
_PRODUCER,
OpenLineageRedactor,
@@ -103,22 +107,35 @@ class OpenLineageAdapter(LoggingMixin):
return self._client
def get_openlineage_config(self) -> dict | None:
- # First, try to read from YAML file
+ # First, try to read from Airflow connection
+ openlineage_config_conn_id = conf.config_conn_id()
+ if openlineage_config_conn_id:
+ config =
AirflowConnectionConfigProvider(openlineage_config_conn_id).get_config()
+ resolve_airflow_connection_auth(config=config,
config_conn_id=openlineage_config_conn_id)
+ return config
+ self.log.debug("OpenLineage config_conn_id configuration not found.")
+
+ # Second, try to read from YAML file
openlineage_config_path = conf.config_path(check_legacy_env_var=False)
if openlineage_config_path:
- config = self._read_yaml_config(openlineage_config_path)
- return config
+ yaml_config = self._read_yaml_config(openlineage_config_path)
+ if yaml_config is None:
+ return None
+ resolve_airflow_connection_auth(yaml_config)
+ return yaml_config
self.log.debug("OpenLineage config_path configuration not found.")
- # Second, try to get transport config
+ # Third, try to get transport config
transport_config = conf.transport()
if not transport_config:
self.log.debug("OpenLineage transport configuration not found.")
return None
- return {"transport": transport_config}
+ config = {"transport": transport_config}
+ resolve_airflow_connection_auth(config)
+ return config
@staticmethod
- def _read_yaml_config(path: str) -> dict | None:
+ def _read_yaml_config(path: str) -> dict[str, Any] | None:
with open(path) as config_file:
return yaml.safe_load(config_file)
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/token_provider.py
b/providers/openlineage/src/airflow/providers/openlineage/token_provider.py
new file mode 100644
index 00000000000..330d8a7375d
--- /dev/null
+++ b/providers/openlineage/src/airflow/providers/openlineage/token_provider.py
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any
+
+from airflow.providers.common.compat.sdk import AirflowException, BaseHook
+
+AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE = "airflow_connection_api_key"
+_DEFAULT_EXTRA_KEYS = ("apiKey", "api_key", "apikey", "token", "access_token")
+
+
+class OpenLineageAirflowConnectionAuthError(AirflowException):
+ """Raised when OpenLineage API key auth cannot be resolved from an Airflow
connection."""
+
+
+class OpenLineageAirflowConnectionConfigError(AirflowException):
+ """Raised when OpenLineage config cannot be resolved from an Airflow
connection."""
+
+
+class AirflowConnectionConfigProvider:
+ """
+ Resolve OpenLineage client configuration from an Airflow connection.
+
+ The connection extra contains the full OpenLineage client config, for
example
+ ``{"transport": {"type": "console"}}``.
+ """
+
+ def __init__(self, conn_id: str) -> None:
+ if not conn_id:
+ raise OpenLineageAirflowConnectionConfigError(
+ "OpenLineage connection config requires a non-empty connection
ID."
+ )
+ self.conn_id = conn_id
+
+ def get_config(self) -> dict[str, Any]:
+ connection = BaseHook.get_connection(self.conn_id)
+ return self._validate_config(connection.extra_dejson)
+
+ def _validate_config(self, config: Any) -> dict[str, Any]:
+ if not isinstance(config, dict):
+ raise OpenLineageAirflowConnectionConfigError(
+ f"OpenLineage connection config `{config}` is not a dict."
+ )
+ if not isinstance(config.get("transport"), dict):
+ raise OpenLineageAirflowConnectionConfigError(
+ "OpenLineage connection config must contain a `transport` JSON
object."
+ )
+ return config
+
+
+class AirflowConnectionTokenProvider:
+ """
+ Resolve an OpenLineage API key from an Airflow connection.
+
+ The connection password is preferred. If it is empty and ``extra_key`` is
configured, that key
+ is read from connection ``extra``. Otherwise, common extra keys are
checked.
+ """
+
+ def __init__(self, config: dict[str, Any], default_conn_id: str | None =
None) -> None:
+ self.conn_id = config.get("conn_id") or default_conn_id or ""
+ self.extra_key = config.get("extra_key")
+ if not self.conn_id:
+ raise OpenLineageAirflowConnectionAuthError(
+ "OpenLineage `airflow_connection_api_key` auth requires a
non-empty `conn_id`."
+ )
+
+ def get_api_key(self) -> str:
+ connection = BaseHook.get_connection(self.conn_id)
+ if connection.password:
+ return connection.password.strip()
+ api_key = self._get_api_key_from_extra(connection.extra_dejson)
+ if api_key:
+ return api_key
+
+ raise OpenLineageAirflowConnectionAuthError(
+ "OpenLineage `airflow_connection_api_key` auth could not find a
token in connection "
+ f"`{self.conn_id}`. Expected connection password or token in
connection extra."
+ )
+
+ def _get_api_key_from_extra(self, extra: dict[str, Any]) -> str | None:
+ if self.extra_key:
+ value = extra.get(self.extra_key)
+ return str(value).strip() if value else None
+
+ for key in _DEFAULT_EXTRA_KEYS:
+ value = extra.get(key)
+ if value:
+ return str(value).strip()
+ return None
+
+
+def resolve_airflow_connection_auth(config: dict[str, Any] | None,
config_conn_id: str | None = None) -> None:
+ """
+ Read the API key from an Airflow connection and put it into the
OpenLineage config.
+
+ OpenLineage config can contain one transport, a composite transport, or
composite transports
+ nested inside each other. This function walks through that structure and
updates every matching
+ ``auth`` block in place.
+
+ This only makes sense for HTTP transports: ``airflow_connection_api_key``
is replaced with
+ ``{"type": "api_key", "apiKey": ...}``.
+ """
+ if not isinstance(config, dict):
+ return
+
+ for key, value in config.items():
+ if (
+ key == "auth"
+ and isinstance(value, dict)
+ and value.get("type") == AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE
+ ):
+ provider = AirflowConnectionTokenProvider(value,
default_conn_id=config_conn_id)
+ config[key] = {"type": "api_key", "apiKey": provider.get_api_key()}
+ elif key == "transports" and isinstance(value, list):
+ for item in value:
+ resolve_airflow_connection_auth(item,
config_conn_id=config_conn_id)
+ else:
+ resolve_airflow_connection_auth(value,
config_conn_id=config_conn_id)
diff --git
a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py
b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py
index 30e18d0b698..a3a252b8cbb 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py
@@ -17,6 +17,7 @@
from __future__ import annotations
import datetime
+import json
import os
import pathlib
import uuid
@@ -41,7 +42,7 @@ from openlineage.client.facet_v2 import (
from airflow import DAG
from airflow.models.dagrun import DagRun, DagRunState
from airflow.models.taskinstance import TaskInstance, TaskInstanceState
-from airflow.providers.common.compat.sdk import Stats
+from airflow.providers.common.compat.sdk import BaseHook, Connection, Stats
from airflow.providers.openlineage.conf import namespace
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.providers.openlineage.plugins.adapter import _PRODUCER,
OpenLineageAdapter
@@ -50,6 +51,11 @@ from airflow.providers.openlineage.plugins.facets import (
AirflowDebugRunFacet,
AirflowStateRunFacet,
)
+from airflow.providers.openlineage.token_provider import (
+ AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE,
+ OpenLineageAirflowConnectionAuthError,
+ OpenLineageAirflowConnectionConfigError,
+)
from airflow.providers.openlineage.utils.utils import get_airflow_job_facet
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
@@ -102,6 +108,160 @@ def test_create_client_from_config_with_options():
assert client.transport.url == "http://ol-api:5000"
[email protected](BaseHook, "get_connection")
+@conf_vars(
+ {
+ ("openlineage", "transport"): '{"type": "http", "url":
"http://ol-api:5000",'
+ ' "auth": {"type": "api_key", "apiKey": "api-key"}}'
+ }
+)
+def
test_create_client_from_config_without_connection_auth_does_not_read_connection(mock_get_connection):
+ client = OpenLineageAdapter().get_or_create_openlineage_client()
+
+ assert client.transport.kind == "http"
+ assert client.transport.url == "http://ol-api:5000"
+ mock_get_connection.assert_not_called()
+
+
+def _connection_auth_transport_config(**auth_config):
+ auth = {
+ "type": AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE,
+ "conn_id": "openlineage_default",
+ **auth_config,
+ }
+ return json.dumps({"type": "http", "url": "http://ol-api:5000", "auth":
auth})
+
+
[email protected](BaseHook, "get_connection")
+def
test_create_client_from_config_with_connection_auth_password(mock_get_connection):
+ mock_get_connection.return_value = Connection(
+ conn_id="openlineage_default", conn_type="http", password="api-key"
+ )
+
+ with conf_vars({("openlineage", "transport"):
_connection_auth_transport_config()}):
+ client = OpenLineageAdapter().get_or_create_openlineage_client()
+
+ assert client.transport.kind == "http"
+ assert client.transport.url == "http://ol-api:5000"
+ assert client.transport.config.auth.api_key == "api-key"
+
+
[email protected](BaseHook, "get_connection")
+def
test_create_client_from_config_with_connection_auth_extra(mock_get_connection):
+ mock_get_connection.return_value = Connection(
+ conn_id="openlineage_default",
+ conn_type="http",
+ extra='{"lineage_token": "api-key-from-extra"}',
+ )
+
+ transport_config =
_connection_auth_transport_config(extra_key="lineage_token")
+ with conf_vars({("openlineage", "transport"): transport_config}):
+ client = OpenLineageAdapter().get_or_create_openlineage_client()
+
+ assert client.transport.kind == "http"
+ assert client.transport.config.auth.api_key == "api-key-from-extra"
+
+
[email protected](BaseHook, "get_connection")
+def
test_create_client_from_config_with_connection_auth_token_extra(mock_get_connection):
+ mock_get_connection.return_value = Connection(
+ conn_id="openlineage_default",
+ conn_type="http",
+ extra='{"token": "api-key-from-token"}',
+ )
+
+ with conf_vars({("openlineage", "transport"):
_connection_auth_transport_config()}):
+ client = OpenLineageAdapter().get_or_create_openlineage_client()
+
+ assert client.transport.kind == "http"
+ assert client.transport.config.auth.api_key == "api-key-from-token"
+
+
[email protected](BaseHook, "get_connection")
+def
test_create_client_from_config_with_connection_auth_missing_secret(mock_get_connection):
+ mock_get_connection.return_value =
Connection(conn_id="openlineage_default", conn_type="http", extra="{}")
+
+ with conf_vars({("openlineage", "transport"):
_connection_auth_transport_config()}):
+ with pytest.raises(OpenLineageAirflowConnectionAuthError, match="could
not find a token"):
+ OpenLineageAdapter().get_or_create_openlineage_client()
+
+
[email protected](BaseHook, "get_connection")
+def
test_create_client_from_connection_config_with_connection_auth_password(mock_get_connection):
+ mock_get_connection.return_value = Connection(
+ conn_id="openlineage_default",
+ conn_type="http",
+ password="api-key",
+ extra=json.dumps(
+ {
+ "transport": {
+ "type": "http",
+ "url": "http://ol-api:5000",
+ "auth": {
+ "type": AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE,
+ },
+ }
+ }
+ ),
+ )
+
+ with conf_vars({("openlineage", "config_conn_id"): "openlineage_default"}):
+ client = OpenLineageAdapter().get_or_create_openlineage_client()
+
+ assert client.transport.kind == "http"
+ assert client.transport.url == "http://ol-api:5000"
+ assert client.transport.config.auth.api_key == "api-key"
+
+
[email protected](BaseHook, "get_connection")
+def test_create_client_from_connection_transport_config(mock_get_connection):
+ mock_get_connection.return_value = Connection(
+ conn_id="openlineage_default",
+ conn_type="generic",
+ extra='{"transport": {"type": "console"}}',
+ )
+
+ with conf_vars({("openlineage", "config_conn_id"): "openlineage_default"}):
+ client = OpenLineageAdapter().get_or_create_openlineage_client()
+
+ assert client.transport.kind == "console"
+
+
[email protected](BaseHook, "get_connection")
+def
test_connection_config_takes_precedence_over_transport_config(mock_get_connection):
+ mock_get_connection.return_value = Connection(
+ conn_id="openlineage_default",
+ conn_type="generic",
+ extra='{"transport": {"type": "console"}}',
+ )
+
+ with conf_vars(
+ {
+ ("openlineage", "config_conn_id"): "openlineage_default",
+ ("openlineage", "transport"): '{"type": "http", "url":
"http://ol-api:5000"}',
+ }
+ ):
+ client = OpenLineageAdapter().get_or_create_openlineage_client()
+
+ assert client.transport.kind == "console"
+
+
[email protected](BaseHook, "get_connection")
+def
test_connection_config_missing_transport_raises_custom_exception(mock_get_connection):
+ mock_get_connection.return_value = Connection(
+ conn_id="openlineage_default",
+ conn_type="generic",
+ extra='{"url": "http://ol-api:5000"}',
+ )
+
+ with conf_vars({("openlineage", "config_conn_id"): "openlineage_default"}):
+ with pytest.raises(
+ OpenLineageAirflowConnectionConfigError,
+ match="must contain a `transport` JSON object",
+ ):
+ OpenLineageAdapter().get_or_create_openlineage_client()
+
+
def test_create_client_from_yaml_config():
current_folder = pathlib.Path(__file__).parent.resolve()
yaml_config = str((current_folder / "openlineage_configs" /
"http.yaml").resolve())
diff --git
a/providers/openlineage/tests/unit/openlineage/test_token_provider.py
b/providers/openlineage/tests/unit/openlineage/test_token_provider.py
new file mode 100644
index 00000000000..afd49433cb3
--- /dev/null
+++ b/providers/openlineage/tests/unit/openlineage/test_token_provider.py
@@ -0,0 +1,192 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import patch
+
+import pytest
+
+from airflow.providers.common.compat.sdk import BaseHook, Connection
+from airflow.providers.openlineage.token_provider import (
+ AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE,
+ AirflowConnectionConfigProvider,
+ AirflowConnectionTokenProvider,
+ OpenLineageAirflowConnectionAuthError,
+ OpenLineageAirflowConnectionConfigError,
+ resolve_airflow_connection_auth,
+)
+
+
[email protected](BaseHook, "get_connection")
+def test_get_api_key_from_connection_password(mock_get_connection):
+ mock_get_connection.return_value = Connection(
+ conn_id="openlineage_default", conn_type="http", password="api-key"
+ )
+
+ provider = AirflowConnectionTokenProvider({"conn_id":
"openlineage_default"})
+
+ assert provider.get_api_key() == "api-key"
+
+
[email protected](BaseHook, "get_connection")
+def test_get_api_key_from_default_connection_id(mock_get_connection):
+ mock_get_connection.return_value = Connection(
+ conn_id="openlineage_default", conn_type="http", password="api-key"
+ )
+
+ provider = AirflowConnectionTokenProvider({},
default_conn_id="openlineage_default")
+
+ assert provider.get_api_key() == "api-key"
+
+
[email protected](BaseHook, "get_connection")
+def test_get_api_key_from_connection_extra(mock_get_connection):
+ mock_get_connection.return_value = Connection(
+ conn_id="openlineage_default", conn_type="http", extra='{"api_key":
"api-key-from-extra"}'
+ )
+
+ provider = AirflowConnectionTokenProvider({"conn_id":
"openlineage_default"})
+
+ assert provider.get_api_key() == "api-key-from-extra"
+
+
+def test_missing_conn_id_raises_custom_exception():
+ with pytest.raises(OpenLineageAirflowConnectionAuthError, match="requires
a non-empty `conn_id`"):
+ AirflowConnectionTokenProvider({})
+
+
[email protected](BaseHook, "get_connection")
+def test_missing_token_raises_custom_exception(mock_get_connection):
+ mock_get_connection.return_value =
Connection(conn_id="openlineage_default", conn_type="http")
+
+ provider = AirflowConnectionTokenProvider({"conn_id":
"openlineage_default"})
+
+ with pytest.raises(OpenLineageAirflowConnectionAuthError, match="could not
find a token"):
+ provider.get_api_key()
+
+
[email protected](BaseHook, "get_connection")
+def test_resolve_connection_auth_in_composite_transport(mock_get_connection):
+ mock_get_connection.return_value = Connection(
+ conn_id="openlineage_default", conn_type="http", password="api-key"
+ )
+ config = {
+ "transport": {
+ "type": "composite",
+ "transports": [
+ {
+ "type": "http",
+ "url": "http://ol-api:5000",
+ "auth": {
+ "type": AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE,
+ "conn_id": "openlineage_default",
+ },
+ }
+ ],
+ }
+ }
+
+ resolve_airflow_connection_auth(config)
+
+ assert config["transport"]["transports"][0]["auth"] == {
+ "type": "api_key",
+ "apiKey": "api-key",
+ }
+
+
[email protected](BaseHook, "get_connection")
+def
test_resolve_connection_auth_in_nested_composite_transport(mock_get_connection):
+ mock_get_connection.return_value = Connection(
+ conn_id="openlineage_default", conn_type="http", password="api-key"
+ )
+ config = {
+ "transport": {
+ "type": "composite",
+ "transports": [
+ {
+ "type": "http",
+ "url": "http://ol-api-1:5000",
+ "auth": {
+ "type": AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE,
+ "conn_id": "openlineage_default",
+ },
+ },
+ {
+ "type": "composite",
+ "transports": [
+ {
+ "type": "http",
+ "url": "http://ol-api-2:5000",
+ "auth": {
+ "type": AIRFLOW_CONNECTION_API_KEY_AUTH_TYPE,
+ "conn_id": "openlineage_default",
+ },
+ },
+ {"type": "console"},
+ ],
+ },
+ ],
+ }
+ }
+
+ resolve_airflow_connection_auth(config)
+
+ assert config["transport"]["transports"][0]["auth"] == {
+ "type": "api_key",
+ "apiKey": "api-key",
+ }
+ assert config["transport"]["transports"][1]["transports"][0]["auth"] == {
+ "type": "api_key",
+ "apiKey": "api-key",
+ }
+ assert config["transport"]["transports"][1]["transports"][1] == {"type":
"console"}
+
+
[email protected](BaseHook, "get_connection")
+def test_get_openlineage_config_from_connection_extra(mock_get_connection):
+ mock_get_connection.return_value = Connection(
+ conn_id="openlineage_default",
+ conn_type="generic",
+ extra='{"transport": {"type": "console"}}',
+ )
+
+ provider = AirflowConnectionConfigProvider("openlineage_default")
+
+ assert provider.get_config() == {"transport": {"type": "console"}}
+
+
+def test_missing_config_conn_id_raises_custom_exception():
+ with pytest.raises(OpenLineageAirflowConnectionConfigError,
match="requires a non-empty connection ID"):
+ AirflowConnectionConfigProvider("")
+
+
[email protected](BaseHook, "get_connection")
+def test_missing_config_raises_custom_exception(mock_get_connection):
+ mock_get_connection.return_value = Connection(
+ conn_id="openlineage_default",
+ conn_type="generic",
+ extra='{"url": "http://ol-api:5000"}',
+ )
+
+ provider = AirflowConnectionConfigProvider("openlineage_default")
+
+ with pytest.raises(
+ OpenLineageAirflowConnectionConfigError,
+ match="must contain a `transport` JSON object",
+ ):
+ provider.get_config()