This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch openlineage-systemtests
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0935241484a515909351b143df96ac8c9371ebcb
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Thu Oct 10 16:06:07 2024 +0200

    add basic system tests for OpenLineage
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
---
 .../providers/openlineage/transport/__init__.py    |  16 ++
 .../providers/openlineage/transport/variable.py    |  56 ++++++
 .../providers/openlineage/utils/operator.py        | 200 +++++++++++++++++++++
 providers/tests/system/openlineage/__init__.py     |  16 ++
 providers/tests/system/openlineage/event.json      |   5 +
 .../system/openlineage/example_openlineage.py      |  50 ++++++
 6 files changed, 343 insertions(+)

diff --git a/providers/src/airflow/providers/openlineage/transport/__init__.py 
b/providers/src/airflow/providers/openlineage/transport/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/providers/src/airflow/providers/openlineage/transport/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/providers/src/airflow/providers/openlineage/transport/variable.py 
b/providers/src/airflow/providers/openlineage/transport/variable.py
new file mode 100644
index 0000000000..2841c1e0ea
--- /dev/null
+++ b/providers/src/airflow/providers/openlineage/transport/variable.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from openlineage.client.serde import Serde
+from openlineage.client.transport import Transport
+
+from airflow.models import Variable
+from airflow.plugins_manager import AirflowPlugin, plugins
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+if TYPE_CHECKING:
+    from openlineage.client.client import Event
+
+
+class VariableTransport(Transport, LoggingMixin):
+    """
+    This transport sends OpenLineage events to Variables.
+
+    Key schema is <DAG_ID>.<TASK_ID>.event.<EVENT_TYPE>.
+    It's made to be used in system tests, stored data read by 
OpenLineageTestOperator.
+    """
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        ...
+
+    def emit(self, event: Event):
+        from airflow.providers.openlineage.plugins.openlineage import 
OpenLineageProviderPlugin
+
+        plugin: AirflowPlugin | None = next(  # type: ignore[assignment]
+            filter(lambda x: isinstance(x, OpenLineageProviderPlugin), 
plugins)  # type: ignore[arg-type]
+        )
+        if not plugin:
+            raise RuntimeError("OpenLineage listener should be set up here")
+        listener = plugin.listeners[0]  # type: ignore
+        ti = listener.current_ti  # type: ignore
+        key = 
f"{ti.dag_id}.{ti.task_id}.event.{event.eventType.value.lower()}"  # type: 
ignore[union-attr]
+        str_event = Serde.to_json(event)
+        Variable.set(key=key, value=str_event)
diff --git a/providers/src/airflow/providers/openlineage/utils/operator.py 
b/providers/src/airflow/providers/openlineage/utils/operator.py
new file mode 100644
index 0000000000..b495a4f4d1
--- /dev/null
+++ b/providers/src/airflow/providers/openlineage/utils/operator.py
@@ -0,0 +1,200 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import typing
+import uuid
+from urllib.parse import urlparse
+
+from dateutil.parser import parse
+from jinja2 import Environment
+
+from airflow.models import BaseOperator
+from airflow.models.variable import Variable
+
+if typing.TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+log = logging.getLogger(__name__)
+
+
+def any(result):
+    return result
+
+
+def is_datetime(result):
+    try:
+        parse(result)
+        return "true"
+    except Exception:
+        pass
+    return "false"
+
+
+def is_uuid(result):
+    try:
+        uuid.UUID(result)
+        return "true"
+    except Exception:
+        pass
+    return "false"
+
+
+def env_var(var: str, default: str | None = None) -> str:
+    """
+    Use this jinja method to access the environment variable named 'var'.
+
+    If there is no such environment variable set, return the default.
+    If the default is None, raise an exception for an undefined variable.
+    """
+    if var in os.environ:
+        return os.environ[var]
+    elif default is not None:
+        return default
+    else:
+        msg = f"Env var required but not provided: '{var}'"
+        raise ValueError(msg)
+
+
+def not_match(result, pattern) -> str:
+    if pattern in result:
+        raise ValueError(f"Found {pattern} in {result}")
+    return "true"
+
+
+def url_scheme_authority(url) -> str:
+    parsed = urlparse(url)
+    return f"{parsed.scheme}://{parsed.netloc}"
+
+
+def url_path(url) -> str:
+    return urlparse(url).path
+
+
+def setup_jinja() -> Environment:
+    env = Environment()
+    env.globals["any"] = any
+    env.globals["is_datetime"] = is_datetime
+    env.globals["is_uuid"] = is_uuid
+    env.globals["env_var"] = env_var
+    env.globals["not_match"] = not_match
+    env.filters["url_scheme_authority"] = url_scheme_authority
+    env.filters["url_path"] = url_path
+    return env
+
+
+env = setup_jinja()
+
+
+def match(expected, result) -> bool:
+    """
+    Check if result is "equal" to expected value.
+
+    Omits keys not specified in expected value and resolves any jinja 
templates found.
+    """
+    if isinstance(expected, dict):
+        # Take a look only at keys present at expected dictionary
+        for k, v in expected.items():
+            if k not in result:
+                log.error("Key %s not in received event %s\nExpected %s", k, 
result, expected)
+                return False
+            if not match(v, result[k]):
+                log.error(
+                    "For key %s, expected value %s not equals received 
%s\nExpected: %s, request: %s",
+                    k,
+                    v,
+                    result[k],
+                    expected,
+                    result,
+                )
+                return False
+    elif isinstance(expected, list):
+        if len(expected) != len(result):
+            log.error("Length does not match: expected %d, result: %d", 
len(expected), len(result))
+            return False
+        for i, x in enumerate(expected):
+            if not match(x, result[i]):
+                log.error(
+                    "List not matched at %d\nexpected:\n%s\nresult: \n%s",
+                    i,
+                    json.dumps(x),
+                    json.dumps(result[i]),
+                )
+                return False
+    elif isinstance(expected, str):
+        if "{{" in expected:
+            # Evaluate jinja: in some cases, we want to check only if key 
exists, or if
+            # value has the right type
+            try:
+                rendered = env.from_string(expected).render(result=result)
+            except ValueError as e:
+                log.error("Error rendering jinja template %s: %s", expected, e)
+                return False
+            if rendered == "true" or rendered == result:
+                return True
+            log.error("Rendered value %s does not equal 'true' or %s", 
rendered, result)
+            return False
+        elif expected != result:
+            log.error("Expected value %s does not equal result %s", expected, 
result)
+            return False
+    elif expected != result:
+        log.error("Object of type %s: %s does not match %s", type(expected), 
expected, result)
+        return False
+    return True
+
+
+class OpenLineageTestOperator(BaseOperator):
+    """
+    This operator is added for system testing purposes.
+
+    It compares expected event templates set on initialization with ones 
emitted by OpenLineage integration
+    and stored in Variables by VariableTransport.
+    :param event_templates: dictionary where key is the key used by 
VariableTransport in format of
+        <DAG_ID>.<TASK_ID>.event.<EVENT_TYPE>, and value is event template 
(fragment)
+         that need to be in received events.
+    :param file_path: alternatively, file_path pointing to file with event 
templates will be used
+    :raises: ValueError if the received events do not match with expected ones.
+    """
+
+    def __init__(
+        self, event_templates: dict[str, dict] | None = None, file_path: str | 
None = None, **kwargs
+    ):
+        super().__init__(**kwargs)
+        self.event_templates = event_templates
+        self.file_path = file_path
+        if self.event_templates and self.file_path:
+            raise ValueError("Can't pass both event_templates and file_path")
+        if self.file_path is not None:
+            self.event_templates = {}
+            with open(file_path) as f:  # type: ignore[arg-type]
+                events = json.load(f)
+            for event in events:
+                key = event["job"]["name"] + ".event." + 
event["eventType"].lower()
+                self.event_templates[key] = event
+
+    def execute(self, context: Context):
+        for key, template in self.event_templates.items():  # type: 
ignore[union-attr]
+            send_event = Variable.get(key=key)
+            self.log.error("Events: %s", send_event)
+            if send_event:
+                self.log.error("Events: %s, %s, %s", send_event, 
len(send_event), type(send_event))
+            if not match(template, json.loads(send_event)):
+                raise ValueError("Event received does not match one specified 
in test")
diff --git a/providers/tests/system/openlineage/__init__.py 
b/providers/tests/system/openlineage/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/providers/tests/system/openlineage/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/providers/tests/system/openlineage/event.json 
b/providers/tests/system/openlineage/event.json
new file mode 100644
index 0000000000..6ce69aee73
--- /dev/null
+++ b/providers/tests/system/openlineage/event.json
@@ -0,0 +1,5 @@
+[
+    {
+        "eventType": "RUNNING"
+    }
+]
diff --git a/providers/tests/system/openlineage/example_openlineage.py 
b/providers/tests/system/openlineage/example_openlineage.py
new file mode 100644
index 0000000000..eead3b7ed6
--- /dev/null
+++ b/providers/tests/system/openlineage/example_openlineage.py
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow import DAG
+from airflow.operators.python import PythonOperator
+from airflow.providers.openlineage.utils.operator import 
OpenLineageTestOperator
+
+
+def do_nothing():
+    pass
+
+
+default_args = {"start_date": datetime(2021, 1, 1), "retries": 1}
+
+# Instantiate the DAG
+with DAG(
+    "openlineage_basic_dag",
+    default_args=default_args,
+    start_date=datetime(2021, 1, 1),
+    schedule=None,
+    catchup=False,
+) as dag:
+    nothing_task = PythonOperator(task_id="do_nothing_task", 
python_callable=do_nothing)
+
+    check_events = OpenLineageTestOperator(task_id="check_events", 
file_path="event.json")
+
+    nothing_task >> check_events
+
+
+from tests_common.test_utils.system_tests import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)

Reply via email to