This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch openlineage-systemtests
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit fd1ba939e0e5b56921d958c67fbca2d789c4fa5e
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Thu Oct 10 16:06:07 2024 +0200

    add basic system tests for OpenLineage
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
---
 .../apache-airflow-providers-openlineage/index.rst |   8 +
 .../providers/openlineage/plugins/adapter.py       |   4 +-
 providers/tests/system/openlineage/__init__.py     |  16 ++
 providers/tests/system/openlineage/conftest.py     |  36 ++++
 .../system/openlineage/example_openlineage.json    |  38 ++++
 .../system/openlineage/example_openlineage.py      |  51 +++++
 .../example_openlineage_mapped_sensor.json         |  75 ++++++++
 .../example_openlineage_mapped_sensor.py           |  81 ++++++++
 providers/tests/system/openlineage/operator.py     | 214 +++++++++++++++++++++
 .../tests/system/openlineage/transport/__init__.py |  16 ++
 .../tests/system/openlineage/transport/variable.py |  53 +++++
 11 files changed, 590 insertions(+), 2 deletions(-)

diff --git a/docs/apache-airflow-providers-openlineage/index.rst 
b/docs/apache-airflow-providers-openlineage/index.rst
index e0c1db9a757..2b1ccda0a7a 100644
--- a/docs/apache-airflow-providers-openlineage/index.rst
+++ b/docs/apache-airflow-providers-openlineage/index.rst
@@ -56,6 +56,14 @@
     PyPI Repository 
<https://pypi.org/project/apache-airflow-providers-openlineage/>
     Installing from sources <installing-providers-from-sources>
 
+.. toctree::
+    :hidden:
+    :maxdepth: 1
+    :caption: System tests
+
+    System Tests <_api/tests/system/openlineage/index>
+
+
 .. THE REMAINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE 
OVERWRITTEN AT RELEASE TIME!
 
 
diff --git a/providers/src/airflow/providers/openlineage/plugins/adapter.py 
b/providers/src/airflow/providers/openlineage/plugins/adapter.py
index 688f2d65a54..15e64d63164 100644
--- a/providers/src/airflow/providers/openlineage/plugins/adapter.py
+++ b/providers/src/airflow/providers/openlineage/plugins/adapter.py
@@ -156,10 +156,10 @@ class OpenLineageAdapter(LoggingMixin):
                 stack.enter_context(Stats.timer("ol.emit.attempts"))
                 self._client.emit(redacted_event)
                 self.log.debug("Successfully emitted OpenLineage event of id 
%s", event.run.runId)
-        except Exception as e:
+        except Exception:
             Stats.incr("ol.emit.failed")
             self.log.warning("Failed to emit OpenLineage event of id %s", 
event.run.runId)
-            self.log.debug("OpenLineage emission failure: %s", e)
+            self.log.debug("OpenLineage emission failure: %s", exc_info=True)
 
         return redacted_event
 
diff --git a/providers/tests/system/openlineage/__init__.py 
b/providers/tests/system/openlineage/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/tests/system/openlineage/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/providers/tests/system/openlineage/conftest.py 
b/providers/tests/system/openlineage/conftest.py
new file mode 100644
index 00000000000..48d568b307e
--- /dev/null
+++ b/providers/tests/system/openlineage/conftest.py
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.listeners.listener import get_listener_manager
+from airflow.providers.openlineage.plugins.listener import OpenLineageListener
+
+from providers.tests.system.openlineage.transport.variable import 
VariableTransport
+
+
[email protected](autouse=True)
+def set_transport_variable():
+    lm = get_listener_manager()
+    lm.clear()
+    listener = OpenLineageListener()
+    listener.adapter._client = 
listener.adapter.get_or_create_openlineage_client()
+    listener.adapter._client.transport = VariableTransport()
+    lm.add_listener(listener)
+    yield
+    lm.clear()
diff --git a/providers/tests/system/openlineage/example_openlineage.json 
b/providers/tests/system/openlineage/example_openlineage.json
new file mode 100644
index 00000000000..0db8bc53e22
--- /dev/null
+++ b/providers/tests/system/openlineage/example_openlineage.json
@@ -0,0 +1,38 @@
+[
+    {
+        "eventType": "START",
+        "eventTime": "{{ is_datetime(result) }}",
+        "run": {
+            "runId": "{{ is_uuid(result) }}"
+        },
+        "job": {
+            "namespace": "default",
+            "name": "openlineage_basic_dag.do_nothing_task",
+            "facets": {
+                "jobType": {
+                    "integration": "AIRFLOW",
+                    "jobType": "TASK",
+                    "processingType": "BATCH"
+                }
+            }
+        }
+    },
+    {
+        "eventType": "COMPLETE",
+        "eventTime": "{{ is_datetime(result) }}",
+        "run": {
+            "runId": "{{ is_uuid(result) }}"
+        },
+        "job": {
+            "namespace": "default",
+            "name": "openlineage_basic_dag.do_nothing_task",
+            "facets": {
+                "jobType": {
+                    "integration": "AIRFLOW",
+                    "jobType": "TASK",
+                    "processingType": "BATCH"
+                }
+            }
+        }
+    }
+]
diff --git a/providers/tests/system/openlineage/example_openlineage.py 
b/providers/tests/system/openlineage/example_openlineage.py
new file mode 100644
index 00000000000..c5263f7dd73
--- /dev/null
+++ b/providers/tests/system/openlineage/example_openlineage.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow import DAG
+from airflow.providers.standard.operators.python import PythonOperator
+
+from providers.tests.system.openlineage.operator import OpenLineageTestOperator
+
+
+def do_nothing():
+    pass
+
+
+default_args = {"start_date": datetime(2021, 1, 1), "retries": 1}
+
+# Instantiate the DAG
+with DAG(
+    "openlineage_basic_dag",
+    default_args=default_args,
+    start_date=datetime(2021, 1, 1),
+    schedule=None,
+    catchup=False,
+) as dag:
+    nothing_task = PythonOperator(task_id="do_nothing_task", 
python_callable=do_nothing)
+
+    check_events = OpenLineageTestOperator(task_id="check_events", 
file_path="example_openlineage.json")
+
+    nothing_task >> check_events
+
+
+from tests_common.test_utils.system_tests import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git 
a/providers/tests/system/openlineage/example_openlineage_mapped_sensor.json 
b/providers/tests/system/openlineage/example_openlineage_mapped_sensor.json
new file mode 100644
index 00000000000..c112fb35448
--- /dev/null
+++ b/providers/tests/system/openlineage/example_openlineage_mapped_sensor.json
@@ -0,0 +1,75 @@
+[
+    {
+        "eventType": "START",
+        "eventTime": "{{ is_datetime(result) }}",
+        "run": {
+            "runId": "{{ is_uuid(result) }}"
+        },
+        "job": {
+            "namespace": "default",
+            "name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
+            "facets": {
+                "jobType": {
+                    "integration": "AIRFLOW",
+                    "jobType": "TASK",
+                    "processingType": "BATCH"
+                }
+            }
+        }
+    },
+    {
+        "eventType": "COMPLETE",
+        "eventTime": "{{ is_datetime(result) }}",
+        "run": {
+            "runId": "{{ is_uuid(result) }}"
+        },
+        "job": {
+            "namespace": "default",
+            "name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
+            "facets": {
+                "jobType": {
+                    "integration": "AIRFLOW",
+                    "jobType": "TASK",
+                    "processingType": "BATCH"
+                }
+            }
+        }
+    },
+    {
+        "eventType": "START",
+        "eventTime": "{{ is_datetime(result) }}",
+        "run": {
+            "runId": "{{ is_uuid(result) }}"
+        },
+        "job": {
+            "namespace": "default",
+            "name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
+            "facets": {
+                "jobType": {
+                    "integration": "AIRFLOW",
+                    "jobType": "TASK",
+                    "processingType": "BATCH"
+                }
+            }
+        }
+    },
+
+    {
+        "eventType": "COMPLETE",
+        "eventTime": "{{ is_datetime(result) }}",
+        "run": {
+            "runId": "{{ is_uuid(result) }}"
+        },
+        "job": {
+            "namespace": "default",
+            "name": "openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds",
+            "facets": {
+                "jobType": {
+                    "integration": "AIRFLOW",
+                    "jobType": "TASK",
+                    "processingType": "BATCH"
+                }
+            }
+        }
+    }
+]
diff --git 
a/providers/tests/system/openlineage/example_openlineage_mapped_sensor.py 
b/providers/tests/system/openlineage/example_openlineage_mapped_sensor.py
new file mode 100644
index 00000000000..a6b8e062265
--- /dev/null
+++ b/providers/tests/system/openlineage/example_openlineage_mapped_sensor.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+from datetime import datetime, timedelta
+
+from airflow import DAG
+from airflow.models import Variable
+from airflow.providers.standard.operators.python import PythonOperator
+from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor
+
+from providers.tests.system.openlineage.operator import OpenLineageTestOperator
+
+
+def my_task(task_number):
+    print(os.getcwd())
+    print(f"Executing task number: {task_number}")
+
+
+def check_start_amount_func():
+    start_sensor_key = 
"openlineage_sensor_mapped_tasks_dag.wait_for_10_seconds.event.start"  # type: 
ignore[union-attr]
+    events = Variable.get(start_sensor_key, deserialize_json=True)
+    if len(events) < 2:
+        raise ValueError(f"Expected at least 2 events, got {len(events)}")
+
+
+with DAG(
+    dag_id="openlineage_sensor_mapped_tasks_dag",
+    start_date=datetime(2021, 1, 1),
+    schedule=None,
+    catchup=False,
+) as dag:
+    wait_for_10_seconds = TimeDeltaSensor(
+        task_id="wait_for_10_seconds",
+        mode="reschedule",
+        poke_interval=5,
+        delta=timedelta(seconds=10),
+    )
+
+    mapped_tasks = [
+        PythonOperator(
+            task_id=f"mapped_task_{i}",
+            python_callable=my_task,
+            op_args=[i],
+        )
+        for i in range(2)
+    ]
+
+    check_start_amount = PythonOperator(
+        task_id="check_order",
+        python_callable=check_start_amount_func,
+    )
+
+    check_events = OpenLineageTestOperator(
+        task_id="check_events",
+        file_path="example_openlineage_mapped_sensor.json",
+        allow_duplicate_events=True,
+    )
+
+    wait_for_10_seconds >> mapped_tasks >> check_start_amount >> check_events
+
+
+from tests_common.test_utils.system_tests import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/providers/tests/system/openlineage/operator.py 
b/providers/tests/system/openlineage/operator.py
new file mode 100644
index 00000000000..a305ca7026a
--- /dev/null
+++ b/providers/tests/system/openlineage/operator.py
@@ -0,0 +1,214 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import uuid
+from typing import TYPE_CHECKING, Any
+from urllib.parse import urlparse
+
+from dateutil.parser import parse
+from jinja2 import Environment
+
+from airflow.models.operator import BaseOperator
+from airflow.models.variable import Variable
+from airflow.utils.session import create_session
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+log = logging.getLogger(__name__)
+
+
+def any(result: Any) -> Any:
+    return result
+
+
+def is_datetime(result: Any) -> str:
+    try:
+        parse(result)
+        return "true"
+    except Exception:
+        pass
+    return "false"
+
+
+def is_uuid(result: Any) -> str:
+    try:
+        uuid.UUID(result)
+        return "true"
+    except Exception:
+        pass
+    return "false"
+
+
+def env_var(var: str, default: str | None = None) -> str:
+    """
+    Use this jinja method to access the environment variable named 'var'.
+
+    If there is no such environment variable set, return the default.
+    If the default is None, raise an exception for an undefined variable.
+    """
+    if var in os.environ:
+        return os.environ[var]
+    elif default is not None:
+        return default
+    raise ValueError(f"Env var required but not provided: '{var}'")
+
+
+def not_match(result: str, pattern: str) -> str:
+    if pattern in result:
+        raise ValueError(f"Found {pattern} in {result}")
+    return "true"
+
+
+def url_scheme_authority(url: str) -> str:
+    parsed = urlparse(url)
+    return f"{parsed.scheme}://{parsed.netloc}"
+
+
+def url_path(url: str) -> str:
+    return urlparse(url).path
+
+
+def setup_jinja() -> Environment:
+    env = Environment()
+    env.globals["any"] = any
+    env.globals["is_datetime"] = is_datetime
+    env.globals["is_uuid"] = is_uuid
+    env.globals["env_var"] = env_var
+    env.globals["not_match"] = not_match
+    env.filters["url_scheme_authority"] = url_scheme_authority
+    env.filters["url_path"] = url_path
+    return env
+
+
+def match(expected, result, env: Environment) -> bool:
+    """
+    Check if result is "equal" to expected value.
+
+    Omits keys not specified in expected value and resolves any jinja 
templates found.
+    """
+    if isinstance(expected, dict):
+        # Take a look only at keys present at expected dictionary
+        if not isinstance(result, dict):
+            log.error("Not a dict: %s\nExpected %s", result, expected)
+            return False
+        for k, v in expected.items():
+            if k not in result:
+                log.error("Key %s not in received event %s\nExpected %s", k, 
result, expected)
+                return False
+            if not match(v, result[k], env):
+                log.error(
+                    "For key %s, expected value %s not equals received 
%s\nExpected: %s, request: %s",
+                    k,
+                    v,
+                    result[k],
+                    expected,
+                    result,
+                )
+                return False
+    elif isinstance(expected, list):
+        if len(expected) != len(result):
+            log.error("Length does not match: expected %d, result: %d", 
len(expected), len(result))
+            return False
+        for i, x in enumerate(expected):
+            if not match(x, result[i], env):
+                log.error(
+                    "List not matched at %d\nexpected:\n%s\nresult: \n%s",
+                    i,
+                    json.dumps(x),
+                    json.dumps(result[i]),
+                )
+                return False
+    elif isinstance(expected, str):
+        if "{{" in expected:
+            # Evaluate jinja: in some cases, we want to check only if key 
exists, or if
+            # value has the right type
+            try:
+                rendered = env.from_string(expected).render(result=result)
+            except ValueError as e:
+                log.error("Error rendering jinja template %s: %s", expected, e)
+                return False
+            if rendered == "true" or rendered == result:
+                return True
+            log.error("Rendered value %s does not equal 'true' or %s", 
rendered, result)
+            return False
+        elif expected != result:
+            log.error("Expected value %s does not equal result %s", expected, 
result)
+            return False
+    elif expected != result:
+        log.error("Object of type %s: %s does not match %s", type(expected), 
expected, result)
+        return False
+    return True
+
+
+class OpenLineageTestOperator(BaseOperator):
+    """
+    This operator is added for system testing purposes.
+
+    It compares expected event templates set on initialization with ones 
emitted by OpenLineage integration
+    and stored in Variables by VariableTransport.
+    :param event_templates: dictionary where key is the key used by 
VariableTransport in format of <DAG_ID>.<TASK_ID>.event.<EVENT_TYPE>, and value 
is event template (fragment) that need to be in received events.
+    :param file_path: alternatively, file_path pointing to file with event 
templates will be used
+    :param env: jinja environment used to render event templates
+    :param allow_duplicate_events: if set to True, allows multiple events for 
the same key
+    :param clear_variables: if set to True, clears all variables after 
checking events
+    :raises: ValueError if the received events do not match with expected ones.
+    """
+
+    def __init__(
+        self,
+        event_templates: dict[str, dict] | None = None,
+        file_path: str | None = None,
+        env: Environment = setup_jinja(),
+        allow_duplicate_events: bool = False,
+        clear_variables: bool = True,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.event_templates = event_templates
+        self.file_path = file_path
+        self.env = env
+        self.multiple_events = allow_duplicate_events
+        self.delete = clear_variables
+        if self.event_templates and self.file_path:
+            raise ValueError("Can't pass both event_templates and file_path")
+
+    def execute(self, context: Context) -> None:
+        if self.file_path is not None:
+            self.event_templates = {}
+            with open(self.file_path) as f:  # type: ignore[arg-type]
+                events = json.load(f)
+            for event in events:
+                key = event["job"]["name"] + ".event." + 
event["eventType"].lower()
+                self.event_templates[key] = event
+        for key, template in self.event_templates.items():  # type: 
ignore[union-attr]
+            send_event = Variable.get(key=key, deserialize_json=True)
+            if len(send_event) == 0:
+                raise ValueError(f"No event for key {key}")
+            if len(send_event) != 1 and not self.multiple_events:
+                raise ValueError(f"Expected one event for key {key}, got 
{len(send_event)}")
+            self.log.info("Events: %s, %s, %s", send_event, len(send_event), 
type(send_event))
+            if not match(template, json.loads(send_event[0]), self.env):
+                raise ValueError("Event received does not match one specified 
in test")
+        if self.delete:
+            with create_session() as session:
+                session.query(Variable).delete()
diff --git a/providers/tests/system/openlineage/transport/__init__.py 
b/providers/tests/system/openlineage/transport/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/tests/system/openlineage/transport/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/providers/tests/system/openlineage/transport/variable.py 
b/providers/tests/system/openlineage/transport/variable.py
new file mode 100644
index 00000000000..ed229b62adf
--- /dev/null
+++ b/providers/tests/system/openlineage/transport/variable.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from openlineage.client.serde import Serde
+from openlineage.client.transport import Transport, get_default_factory
+
+from airflow.models.variable import Variable
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+if TYPE_CHECKING:
+    from openlineage.client.client import Event
+
+
+class VariableTransport(Transport, LoggingMixin):
+    """
+    This transport sends OpenLineage events to Variables.
+
+    Key schema is <DAG_ID>.<TASK_ID>.event.<EVENT_TYPE>.
+    It's made to be used in system tests, stored data read by 
OpenLineageTestOperator.
+    """
+
+    kind = "variable"
+
+    def __init__(self, *args, **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+
+    def emit(self, event: Event) -> None:
+        key = f"{event.job.name}.event.{event.eventType.value.lower()}"  # 
type: ignore[union-attr]
+        event_str = Serde.to_json(event)
+        if (var := Variable.get(key=key, default_var=None, 
deserialize_json=True)) is not None:
+            Variable.set(key=key, value=var + [event_str], serialize_json=True)
+        else:
+            Variable.set(key=key, value=[event_str], serialize_json=True)
+
+
+get_default_factory().register_transport(VariableTransport.kind, 
VariableTransport)

Reply via email to