This is an automated email from the ASF dual-hosted git repository. mobuchowski pushed a commit to branch openlineage-systemtests in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0935241484a515909351b143df96ac8c9371ebcb Author: Maciej Obuchowski <[email protected]> AuthorDate: Thu Oct 10 16:06:07 2024 +0200 add basic system tests for OpenLineage Signed-off-by: Maciej Obuchowski <[email protected]> --- .../providers/openlineage/transport/__init__.py | 16 ++ .../providers/openlineage/transport/variable.py | 56 ++++++ .../providers/openlineage/utils/operator.py | 200 +++++++++++++++++++++ providers/tests/system/openlineage/__init__.py | 16 ++ providers/tests/system/openlineage/event.json | 5 + .../system/openlineage/example_openlineage.py | 50 ++++++ 6 files changed, 343 insertions(+) diff --git a/providers/src/airflow/providers/openlineage/transport/__init__.py b/providers/src/airflow/providers/openlineage/transport/__init__.py new file mode 100644 index 0000000000..13a83393a9 --- /dev/null +++ b/providers/src/airflow/providers/openlineage/transport/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/src/airflow/providers/openlineage/transport/variable.py b/providers/src/airflow/providers/openlineage/transport/variable.py new file mode 100644 index 0000000000..2841c1e0ea --- /dev/null +++ b/providers/src/airflow/providers/openlineage/transport/variable.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +from openlineage.client.serde import Serde +from openlineage.client.transport import Transport + +from airflow.models import Variable +from airflow.plugins_manager import AirflowPlugin, plugins +from airflow.utils.log.logging_mixin import LoggingMixin + +if TYPE_CHECKING: + from openlineage.client.client import Event + + +class VariableTransport(Transport, LoggingMixin): + """ + This transport sends OpenLineage events to Variables. + + Key schema is <DAG_ID>.<TASK_ID>.event.<EVENT_TYPE>. + It's made to be used in system tests, stored data read by OpenLineageTestOperator. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + ... + + def emit(self, event: Event): + from airflow.providers.openlineage.plugins.openlineage import OpenLineageProviderPlugin + + plugin: AirflowPlugin | None = next( # type: ignore[assignment] + filter(lambda x: isinstance(x, OpenLineageProviderPlugin), plugins) # type: ignore[arg-type] + ) + if not plugin: + raise RuntimeError("OpenLineage listener should be set up here") + listener = plugin.listeners[0] # type: ignore + ti = listener.current_ti # type: ignore + key = f"{ti.dag_id}.{ti.task_id}.event.{event.eventType.value.lower()}" # type: ignore[union-attr] + str_event = Serde.to_json(event) + Variable.set(key=key, value=str_event) diff --git a/providers/src/airflow/providers/openlineage/utils/operator.py b/providers/src/airflow/providers/openlineage/utils/operator.py new file mode 100644 index 0000000000..b495a4f4d1 --- /dev/null +++ b/providers/src/airflow/providers/openlineage/utils/operator.py @@ -0,0 +1,200 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import json +import logging +import os +import typing +import uuid +from urllib.parse import urlparse + +from dateutil.parser import parse +from jinja2 import Environment + +from airflow.models import BaseOperator +from airflow.models.variable import Variable + +if typing.TYPE_CHECKING: + from airflow.utils.context import Context + +log = logging.getLogger(__name__) + + +def any(result): + return result + + +def is_datetime(result): + try: + parse(result) + return "true" + except Exception: + pass + return "false" + + +def is_uuid(result): + try: + uuid.UUID(result) + return "true" + except Exception: + pass + return "false" + + +def env_var(var: str, default: str | None = None) -> str: + """ + Use this jinja method to access the environment variable named 'var'. + + If there is no such environment variable set, return the default. + If the default is None, raise an exception for an undefined variable. + """ + if var in os.environ: + return os.environ[var] + elif default is not None: + return default + else: + msg = f"Env var required but not provided: '{var}'" + raise ValueError(msg) + + +def not_match(result, pattern) -> str: + if pattern in result: + raise ValueError(f"Found {pattern} in {result}") + return "true" + + +def url_scheme_authority(url) -> str: + parsed = urlparse(url) + return f"{parsed.scheme}://{parsed.netloc}" + + +def url_path(url) -> str: + return urlparse(url).path + + +def setup_jinja() -> Environment: + env = Environment() + env.globals["any"] = any + env.globals["is_datetime"] = is_datetime + env.globals["is_uuid"] = is_uuid + env.globals["env_var"] = env_var + env.globals["not_match"] = not_match + env.filters["url_scheme_authority"] = url_scheme_authority + env.filters["url_path"] = url_path + return env + + +env = setup_jinja() + + +def match(expected, result) -> bool: + """ + Check if result is "equal" to expected value. + + Omits keys not specified in expected value and resolves any jinja templates found. + """ + if isinstance(expected, dict): + # Take a look only at keys present at expected dictionary + for k, v in expected.items(): + if k not in result: + log.error("Key %s not in received event %s\nExpected %s", k, result, expected) + return False + if not match(v, result[k]): + log.error( + "For key %s, expected value %s not equals received %s\nExpected: %s, request: %s", + k, + v, + result[k], + expected, + result, + ) + return False + elif isinstance(expected, list): + if len(expected) != len(result): + log.error("Length does not match: expected %d, result: %d", len(expected), len(result)) + return False + for i, x in enumerate(expected): + if not match(x, result[i]): + log.error( + "List not matched at %d\nexpected:\n%s\nresult: \n%s", + i, + json.dumps(x), + json.dumps(result[i]), + ) + return False + elif isinstance(expected, str): + if "{{" in expected: + # Evaluate jinja: in some cases, we want to check only if key exists, or if + # value has the right type + try: + rendered = env.from_string(expected).render(result=result) + except ValueError as e: + log.error("Error rendering jinja template %s: %s", expected, e) + return False + if rendered == "true" or rendered == result: + return True + log.error("Rendered value %s does not equal 'true' or %s", rendered, result) + return False + elif expected != result: + log.error("Expected value %s does not equal result %s", expected, result) + return False + elif expected != result: + log.error("Object of type %s: %s does not match %s", type(expected), expected, result) + return False + return True + + +class OpenLineageTestOperator(BaseOperator): + """ + This operator is added for system testing purposes. + + It compares expected event templates set on initialization with ones emitted by OpenLineage integration + and stored in Variables by VariableTransport. + :param event_templates: dictionary where key is the key used by VariableTransport in format of + <DAG_ID>.<TASK_ID>.event.<EVENT_TYPE>, and value is event template (fragment) + that need to be in received events. + :param file_path: alternatively, file_path pointing to file with event templates will be used + :raises: ValueError if the received events do not match with expected ones. + """ + + def __init__( + self, event_templates: dict[str, dict] | None = None, file_path: str | None = None, **kwargs + ): + super().__init__(**kwargs) + self.event_templates = event_templates + self.file_path = file_path + if self.event_templates and self.file_path: + raise ValueError("Can't pass both event_templates and file_path") + if self.file_path is not None: + self.event_templates = {} + with open(file_path) as f: # type: ignore[arg-type] + events = json.load(f) + for event in events: + key = event["job"]["name"] + ".event." + event["eventType"].lower() + self.event_templates[key] = event + + def execute(self, context: Context): + for key, template in self.event_templates.items(): # type: ignore[union-attr] + send_event = Variable.get(key=key) + self.log.error("Events: %s", send_event) + if send_event: + self.log.error("Events: %s, %s, %s", send_event, len(send_event), type(send_event)) + if not match(template, json.loads(send_event)): + raise ValueError("Event received does not match one specified in test") diff --git a/providers/tests/system/openlineage/__init__.py b/providers/tests/system/openlineage/__init__.py new file mode 100644 index 0000000000..13a83393a9 --- /dev/null +++ b/providers/tests/system/openlineage/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/tests/system/openlineage/event.json b/providers/tests/system/openlineage/event.json new file mode 100644 index 0000000000..6ce69aee73 --- /dev/null +++ b/providers/tests/system/openlineage/event.json @@ -0,0 +1,5 @@ +[ + { + "eventType": "RUNNING" + } +] diff --git a/providers/tests/system/openlineage/example_openlineage.py b/providers/tests/system/openlineage/example_openlineage.py new file mode 100644 index 0000000000..eead3b7ed6 --- /dev/null +++ b/providers/tests/system/openlineage/example_openlineage.py @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.openlineage.utils.operator import OpenLineageTestOperator + + +def do_nothing(): + pass + + +default_args = {"start_date": datetime(2021, 1, 1), "retries": 1} + +# Instantiate the DAG +with DAG( + "openlineage_basic_dag", + default_args=default_args, + start_date=datetime(2021, 1, 1), + schedule=None, + catchup=False, +) as dag: + nothing_task = PythonOperator(task_id="do_nothing_task", python_callable=do_nothing) + + check_events = OpenLineageTestOperator(task_id="check_events", file_path="event.json") + + nothing_task >> check_events + + +from tests_common.test_utils.system_tests import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)
