This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1f575ab700a tests: Add OL system tests for deferrable
TriggerDagRunOperator (#58933)
1f575ab700a is described below
commit 1f575ab700a5bc3624e47707bc5bf32309227b7d
Author: Kacper Muda <[email protected]>
AuthorDate: Tue Dec 2 20:12:19 2025 +0100
tests: Add OL system tests for deferrable TriggerDagRunOperator (#58933)
---
.../example_openlineage_defer_simple_dag.py | 10 +-
.../example_openlineage_mapped_simple_dag.py | 4 +-
.../example_openlineage_trigger_dag_deferrable.py | 93 +++
.../expected_events/openlineage_trigger_dag.json | 298 +++++-----
.../openlineage_trigger_dag_deferrable.json | 660 +++++++++++++++++++++
.../tests/system/openlineage/operator.py | 16 +-
6 files changed, 918 insertions(+), 163 deletions(-)
diff --git
a/providers/openlineage/tests/system/openlineage/example_openlineage_defer_simple_dag.py
b/providers/openlineage/tests/system/openlineage/example_openlineage_defer_simple_dag.py
index 74275ee11d5..145fdca6505 100644
---
a/providers/openlineage/tests/system/openlineage/example_openlineage_defer_simple_dag.py
+++
b/providers/openlineage/tests/system/openlineage/example_openlineage_defer_simple_dag.py
@@ -24,14 +24,12 @@ It checks:
from __future__ import annotations
-import warnings
from datetime import datetime, timedelta
from airflow import DAG
-from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models import Variable
from airflow.providers.standard.operators.python import PythonOperator
-from airflow.providers.standard.sensors.time_delta import TimeDeltaSensorAsync
+from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor
from system.openlineage.expected_events import get_expected_event_file_path
from system.openlineage.operator import OpenLineageTestOperator
@@ -54,9 +52,7 @@ with DAG(
) as dag:
# Timedelta is compared to the DAGRun start timestamp, which can occur
long before a worker picks up the
# task. We need to ensure the sensor gets deferred at least once, so
setting 180s.
- with warnings.catch_warnings(): # TODO Switch to TimeDeltaSensor when
deferrable is released
- warnings.simplefilter("ignore", AirflowProviderDeprecationWarning)
- wait = TimeDeltaSensorAsync(task_id="wait",
delta=timedelta(seconds=180))
+ wait = TimeDeltaSensor(task_id="wait", delta=timedelta(seconds=180),
deferrable=True)
check_events_number = PythonOperator(
task_id="check_events_number", python_callable=check_events_number_func
@@ -65,7 +61,7 @@ with DAG(
check_events = OpenLineageTestOperator(
task_id="check_events",
file_path=get_expected_event_file_path(DAG_ID),
- allow_duplicate_events=True,
+
allow_duplicate_events_regex="openlineage_defer_simple_dag.wait.event.start",
)
wait >> check_events_number >> check_events
diff --git
a/providers/openlineage/tests/system/openlineage/example_openlineage_mapped_simple_dag.py
b/providers/openlineage/tests/system/openlineage/example_openlineage_mapped_simple_dag.py
index 1e73d4edcf9..a80608edd23 100644
---
a/providers/openlineage/tests/system/openlineage/example_openlineage_mapped_simple_dag.py
+++
b/providers/openlineage/tests/system/openlineage/example_openlineage_mapped_simple_dag.py
@@ -78,7 +78,9 @@ with DAG(
)
check_events = OpenLineageTestOperator(
- task_id="check_events",
file_path=get_expected_event_file_path(DAG_ID), allow_duplicate_events=True
+ task_id="check_events",
+ file_path=get_expected_event_file_path(DAG_ID),
+
allow_duplicate_events_regex="openlineage_mapped_simple_dag.add_one.event.(start|complete)",
)
sum_it(added_values) >> check_events_number >> check_events
diff --git
a/providers/openlineage/tests/system/openlineage/example_openlineage_trigger_dag_deferrable.py
b/providers/openlineage/tests/system/openlineage/example_openlineage_trigger_dag_deferrable.py
new file mode 100644
index 00000000000..4a4f8ee934b
--- /dev/null
+++
b/providers/openlineage/tests/system/openlineage/example_openlineage_trigger_dag_deferrable.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Simple DAG that triggers another simple DAG in deferrable mode.
+
+It checks:
+ - task's trigger_dag_id, trigger_run_id, deferrable attribute
+ - DAGRun START and COMPLETE events, for the triggered DAG
+ - automatic injection of OL parent and root info to DAGRun conf
+ - multiple levels of triggering
+"""
+
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow import DAG
+from airflow.providers.standard.operators.bash import BashOperator
+from airflow.providers.standard.operators.trigger_dagrun import
TriggerDagRunOperator
+
+from system.openlineage.expected_events import get_expected_event_file_path
+from system.openlineage.operator import OpenLineageTestOperator
+
+DAG_ID = "openlineage_trigger_dag_deferrable"
+
+with DAG(
+ dag_id=DAG_ID,
+ start_date=datetime(2021, 1, 1),
+ schedule=None,
+ catchup=False,
+ default_args={"retries": 0},
+) as dag:
+ trigger_dagrun = TriggerDagRunOperator(
+ task_id="trigger_dagrun",
+ trigger_dag_id="openlineage_trigger_dag_deferrable_child__notrigger",
+ wait_for_completion=True,
+ conf={"some_config": "value1"},
+ poke_interval=5,
+ deferrable=True,
+ )
+
+ check_events = OpenLineageTestOperator(
+ task_id="check_events",
+ file_path=get_expected_event_file_path(DAG_ID),
+
allow_duplicate_events_regex="openlineage_trigger_dag_deferrable.trigger_dagrun.event.start",
+ )
+
+ trigger_dagrun >> check_events
+
+
+with DAG(
+ dag_id="openlineage_trigger_dag_deferrable_child__notrigger",
+ start_date=datetime(2021, 1, 1),
+ schedule=None,
+ catchup=False,
+ default_args={"retries": 0},
+) as child_dag:
+ trigger_dagrun2 = TriggerDagRunOperator(
+ task_id="trigger_dagrun2",
+ trigger_dag_id="openlineage_trigger_dag_deferrable_child2__notrigger",
+ wait_for_completion=True,
+ poke_interval=5,
+ )
+
+
+with DAG(
+ dag_id="openlineage_trigger_dag_deferrable_child2__notrigger",
+ start_date=datetime(2021, 1, 1),
+ schedule=None,
+ catchup=False,
+ default_args={"retries": 0},
+) as child_dag2:
+ do_nothing_task = BashOperator(task_id="do_nothing_task",
bash_command="sleep 10;")
+
+
+from tests_common.test_utils.system_tests import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git
a/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag.json
b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag.json
index 3695408b28f..a32c8659a81 100644
---
a/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag.json
+++
b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag.json
@@ -43,24 +43,24 @@
"_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\")
}}"
},
"parent": {
- "job": {
- "namespace": "prod_biz",
- "name": "get_files"
- },
- "run": {
- "runId": "3bb703d1-09c1-4a42-8da5-35a0b3216072"
- },
- "root": {
"job": {
- "name": "generate_report_sales_e2e",
- "namespace": "prod_analytics"
+ "namespace": "prod_biz",
+ "name": "get_files"
},
"run": {
- "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
- }
- },
- "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
- "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ "runId": "3bb703d1-09c1-4a42-8da5-35a0b3216072"
+ },
+ "root": {
+ "job": {
+ "name": "generate_report_sales_e2e",
+ "namespace": "prod_analytics"
+ },
+ "run": {
+ "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
},
"nominalTime": {
"nominalEndTime": "{{ is_datetime(result) }}",
@@ -82,10 +82,10 @@
"name": "openlineage_trigger_dag_child__notrigger",
"facets": {
"documentation": {
- "description": "MD DAG doc",
- "contentType": "text/markdown",
- "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
- "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/DocumentationJobFacet.json\\#\\/\\$defs\\/DocumentationJobFacet\")
}}"
+ "description": "MD DAG doc",
+ "contentType": "text/markdown",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/DocumentationJobFacet.json\\#\\/\\$defs\\/DocumentationJobFacet\")
}}"
},
"jobType": {
"integration": "AIRFLOW",
@@ -104,30 +104,30 @@
"_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\")
}}"
},
"tags": {
- "tags": [
- {
- "key": "first",
- "value": "first",
- "source": "AIRFLOW"
- },
- {
- "key": "second@",
- "value": "second@",
- "source": "AIRFLOW"
- },
- {
- "key": "with'quote",
- "value": "with'quote",
- "source": "AIRFLOW"
- },
- {
- "key": "z\"e",
- "value": "z\"e",
- "source": "AIRFLOW"
- }
- ],
- "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
- "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\")
}}"
+ "tags": [
+ {
+ "key": "first",
+ "value": "first",
+ "source": "AIRFLOW"
+ },
+ {
+ "key": "second@",
+ "value": "second@",
+ "source": "AIRFLOW"
+ },
+ {
+ "key": "with'quote",
+ "value": "with'quote",
+ "source": "AIRFLOW"
+ },
+ {
+ "key": "z\"e",
+ "value": "z\"e",
+ "source": "AIRFLOW"
+ }
+ ],
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\")
}}"
},
"airflow": {
"taskGroups": {},
@@ -204,24 +204,24 @@
"_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\")
}}"
},
"parent": {
- "job": {
- "namespace": "prod_biz",
- "name": "get_files"
- },
- "run": {
- "runId": "3bb703d1-09c1-4a42-8da5-35a0b3216072"
- },
- "root": {
"job": {
- "name": "generate_report_sales_e2e",
- "namespace": "prod_analytics"
+ "namespace": "prod_biz",
+ "name": "get_files"
},
"run": {
- "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
- }
- },
- "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
- "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ "runId": "3bb703d1-09c1-4a42-8da5-35a0b3216072"
+ },
+ "root": {
+ "job": {
+ "name": "generate_report_sales_e2e",
+ "namespace": "prod_analytics"
+ },
+ "run": {
+ "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
},
"nominalTime": {
"nominalEndTime": "{{ is_datetime(result) }}",
@@ -243,10 +243,10 @@
"name": "openlineage_trigger_dag_child__notrigger",
"facets": {
"documentation": {
- "description": "MD DAG doc",
- "contentType": "text/markdown",
- "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
- "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/DocumentationJobFacet.json\\#\\/\\$defs\\/DocumentationJobFacet\")
}}"
+ "description": "MD DAG doc",
+ "contentType": "text/markdown",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/DocumentationJobFacet.json\\#\\/\\$defs\\/DocumentationJobFacet\")
}}"
},
"ownership": {
"owners": [
@@ -258,30 +258,30 @@
"_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\")
}}"
},
"tags": {
- "tags": [
- {
- "key": "first",
- "value": "first",
- "source": "AIRFLOW"
- },
- {
- "key": "second@",
- "value": "second@",
- "source": "AIRFLOW"
- },
- {
- "key": "with'quote",
- "value": "with'quote",
- "source": "AIRFLOW"
- },
- {
- "key": "z\"e",
- "value": "z\"e",
- "source": "AIRFLOW"
- }
- ],
- "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
- "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\")
}}"
+ "tags": [
+ {
+ "key": "first",
+ "value": "first",
+ "source": "AIRFLOW"
+ },
+ {
+ "key": "second@",
+ "value": "second@",
+ "source": "AIRFLOW"
+ },
+ {
+ "key": "with'quote",
+ "value": "with'quote",
+ "source": "AIRFLOW"
+ },
+ {
+ "key": "z\"e",
+ "value": "z\"e",
+ "source": "AIRFLOW"
+ }
+ ],
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/TagsJobFacet.json\\#\\/\\$defs\\/TagsJobFacet\")
}}"
},
"jobType": {
"integration": "AIRFLOW",
@@ -298,24 +298,24 @@
"run": {
"facets": {
"parent": {
- "job": {
- "namespace": "{{ result is string }}",
- "name": "openlineage_trigger_dag_child__notrigger"
- },
- "run": {
- "runId": "{{ is_uuid(result) }}"
- },
- "root": {
"job": {
- "name": "generate_report_sales_e2e",
- "namespace": "prod_analytics"
+ "namespace": "{{ result is string }}",
+ "name": "openlineage_trigger_dag_child__notrigger"
},
"run": {
- "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
- }
- },
- "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
- "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "generate_report_sales_e2e",
+ "namespace": "prod_analytics"
+ },
+ "run": {
+ "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
}
}
},
@@ -328,24 +328,24 @@
"run": {
"facets": {
"parent": {
- "job": {
- "namespace": "{{ result is string }}",
- "name": "openlineage_trigger_dag_child__notrigger"
- },
- "run": {
- "runId": "{{ is_uuid(result) }}"
- },
- "root": {
"job": {
- "name": "generate_report_sales_e2e",
- "namespace": "prod_analytics"
+ "namespace": "{{ result is string }}",
+ "name": "openlineage_trigger_dag_child__notrigger"
},
"run": {
- "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
- }
- },
- "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
- "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "generate_report_sales_e2e",
+ "namespace": "prod_analytics"
+ },
+ "run": {
+ "runId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
}
}
},
@@ -359,28 +359,29 @@
"facets": {
"airflow": {
"task": {
- "trigger_dag_id":
"openlineage_trigger_dag_child__notrigger"
+ "trigger_dag_id":
"openlineage_trigger_dag_child__notrigger",
+ "trigger_run_id": "{{
result.startswith('openlineage_trigger_dag_triggering_child_202') }}"
}
},
"parent": {
- "job": {
- "namespace": "{{ result is string }}",
- "name": "openlineage_trigger_dag"
- },
- "run": {
- "runId": "{{ is_uuid(result) }}"
- },
- "root": {
"job": {
- "name": "openlineage_trigger_dag",
- "namespace": "{{ result is string }}"
+ "namespace": "{{ result is string }}",
+ "name": "openlineage_trigger_dag"
},
"run": {
- "runId": "{{ is_uuid(result) }}"
- }
- },
- "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
- "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "openlineage_trigger_dag",
+ "namespace": "{{ result is string }}"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
}
}
},
@@ -394,28 +395,29 @@
"facets": {
"airflow": {
"task": {
- "trigger_dag_id":
"openlineage_trigger_dag_child__notrigger"
+ "trigger_dag_id":
"openlineage_trigger_dag_child__notrigger",
+ "trigger_run_id": "{{
result.startswith('openlineage_trigger_dag_triggering_child_202') }}"
}
},
"parent": {
- "job": {
- "namespace": "{{ result is string }}",
- "name": "openlineage_trigger_dag"
- },
- "run": {
- "runId": "{{ is_uuid(result) }}"
- },
- "root": {
"job": {
- "name": "openlineage_trigger_dag",
- "namespace": "{{ result is string }}"
+ "namespace": "{{ result is string }}",
+ "name": "openlineage_trigger_dag"
},
"run": {
- "runId": "{{ is_uuid(result) }}"
- }
- },
- "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
- "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "openlineage_trigger_dag",
+ "namespace": "{{ result is string }}"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
}
}
},
diff --git
a/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag_deferrable.json
b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag_deferrable.json
new file mode 100644
index 00000000000..fc274dd93dc
--- /dev/null
+++
b/providers/openlineage/tests/system/openlineage/expected_events/openlineage_trigger_dag_deferrable.json
@@ -0,0 +1,660 @@
+[
+ {
+ "eventType": "START",
+ "eventTime": "{{ is_datetime(result) }}",
+ "producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunEvent$\")
}}",
+ "inputs": [],
+ "outputs": [],
+ "run": {
+ "runId": "{{ is_uuid(result) }}",
+ "facets": {
+ "airflowDagRun": {
+ "dag": {
+ "dag_id":
"openlineage_trigger_dag_deferrable_child__notrigger",
+ "fileloc": "{{
result.endswith('openlineage/example_openlineage_trigger_dag_deferrable.py')
}}",
+ "owner": "airflow",
+ "owner_links": {},
+ "start_date": "{{ is_datetime(result) }}",
+ "timetable": {}
+ },
+ "dagRun": {
+ "conf": {
+ "some_config": "value1",
+ "openlineage": {
+ "parentRunId": "{{ is_uuid(result) }}",
+ "parentJobNamespace": "{{ result is string }}",
+ "parentJobName":
"openlineage_trigger_dag_deferrable.trigger_dagrun",
+ "rootParentRunId": "{{ is_uuid(result) }}",
+ "rootParentJobNamespace": "{{ result is string
}}",
+ "rootParentJobName":
"openlineage_trigger_dag_deferrable"
+ }
+ },
+ "dag_id":
"openlineage_trigger_dag_deferrable_child__notrigger",
+ "data_interval_end": "{{ is_datetime(result) }}",
+ "data_interval_start": "{{ is_datetime(result) }}",
+ "logical_date": "{{ is_datetime(result) }}",
+ "run_id": "{{ result.startswith('manual__202') }}",
+ "run_type": "manual",
+ "start_date": "{{ is_datetime(result) }}"
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\")
}}"
+ },
+ "parent": {
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name":
"openlineage_trigger_dag_deferrable.trigger_dagrun"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "openlineage_trigger_dag_deferrable",
+ "namespace": "{{ result is string }}"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ },
+ "nominalTime": {
+ "nominalEndTime": "{{ is_datetime(result) }}",
+ "nominalStartTime": "{{ is_datetime(result) }}",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\")
}}"
+ },
+ "processing_engine": {
+ "name": "Airflow",
+ "openlineageAdapterVersion": "{{ regex_match(result,
\"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}",
+ "version": "{{ regex_match(result,
\"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ProcessingEngineRunFacet.json\\#\\/\\$defs\\/ProcessingEngineRunFacet$\")
}}"
+ }
+ }
+ },
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name": "openlineage_trigger_dag_deferrable_child__notrigger",
+ "facets": {
+ "jobType": {
+ "integration": "AIRFLOW",
+ "jobType": "DAG",
+ "processingType": "BATCH",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/JobTypeJobFacet.json\\#\\/\\$defs\\/JobTypeJobFacet\")
}}"
+ },
+ "ownership": {
+ "owners": [
+ {
+ "name": "airflow"
+ }
+ ],
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\")
}}"
+ },
+ "airflow": {
+ "taskGroups": {},
+ "taskTree": {},
+ "tasks": {
+ "trigger_dagrun2": {
+ "downstream_task_ids": [],
+ "emits_ol_events": "{{ result == true }}",
+ "is_setup": false,
+ "is_teardown": false,
+ "operator":
"airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunOperator",
+ "ui_label": "trigger_dagrun2"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/JobFacet\")
}}"
+ }
+ }
+ }
+ },
+ {
+ "eventType": "COMPLETE",
+ "eventTime": "{{ is_datetime(result) }}",
+ "producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunEvent$\")
}}",
+ "inputs": [],
+ "outputs": [],
+ "run": {
+ "runId": "{{ is_uuid(result) }}",
+ "facets": {
+ "airflowState": {
+ "dagRunState": "success",
+ "tasksState": {
+ "trigger_dagrun2": "success"
+ },
+ "tasksDuration": {
+ "trigger_dagrun2": "{{ result is number }}"
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\")
}}"
+ },
+ "airflowDagRun": {
+ "dag": {
+ "dag_id":
"openlineage_trigger_dag_deferrable_child__notrigger",
+ "fileloc": "{{
result.endswith('openlineage/example_openlineage_trigger_dag_deferrable.py')
}}",
+ "owner": "airflow",
+ "owner_links": {},
+ "start_date": "{{ is_datetime(result) }}",
+ "timetable": {}
+ },
+ "dagRun": {
+ "conf": {
+ "some_config": "value1",
+ "openlineage": {
+ "parentRunId": "{{ is_uuid(result) }}",
+ "parentJobNamespace": "{{ result is string }}",
+ "parentJobName":
"openlineage_trigger_dag_deferrable.trigger_dagrun",
+ "rootParentRunId": "{{ is_uuid(result) }}",
+ "rootParentJobNamespace": "{{ result is string
}}",
+ "rootParentJobName":
"openlineage_trigger_dag_deferrable"
+ }
+ },
+ "dag_id":
"openlineage_trigger_dag_deferrable_child__notrigger",
+ "data_interval_end": "{{ is_datetime(result) }}",
+ "data_interval_start": "{{ is_datetime(result) }}",
+ "logical_date": "{{ is_datetime(result) }}",
+ "run_id": "{{ result.startswith('manual__202') }}",
+ "run_type": "manual",
+ "start_date": "{{ is_datetime(result) }}",
+ "duration": "{{ result is number }}"
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\")
}}"
+ },
+ "parent": {
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name":
"openlineage_trigger_dag_deferrable.trigger_dagrun"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "openlineage_trigger_dag_deferrable",
+ "namespace": "{{ result is string }}"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ },
+ "nominalTime": {
+ "nominalEndTime": "{{ is_datetime(result) }}",
+ "nominalStartTime": "{{ is_datetime(result) }}",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\")
}}"
+ },
+ "processing_engine": {
+ "name": "Airflow",
+ "openlineageAdapterVersion": "{{ regex_match(result,
\"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}",
+ "version": "{{ regex_match(result,
\"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ProcessingEngineRunFacet.json\\#\\/\\$defs\\/ProcessingEngineRunFacet$\")
}}"
+ }
+ }
+ },
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name": "openlineage_trigger_dag_deferrable_child__notrigger",
+ "facets": {
+ "ownership": {
+ "owners": [
+ {
+ "name": "airflow"
+ }
+ ],
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\")
}}"
+ },
+ "jobType": {
+ "integration": "AIRFLOW",
+ "jobType": "DAG",
+ "processingType": "BATCH",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/JobTypeJobFacet.json\\#\\/\\$defs\\/JobTypeJobFacet\")
}}"
+ }
+ }
+ }
+ },
+ {
+ "eventType": "START",
+ "run": {
+ "facets": {
+ "airflow": {
+ "task": {
+ "trigger_dag_id":
"openlineage_trigger_dag_deferrable_child2__notrigger"
+ }
+ },
+ "parent": {
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name":
"openlineage_trigger_dag_deferrable_child__notrigger"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "openlineage_trigger_dag_deferrable",
+ "namespace": "{{ result is string }}"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ }
+ }
+ },
+ "job": {
+ "name":
"openlineage_trigger_dag_deferrable_child__notrigger.trigger_dagrun2"
+ }
+ },
+ {
+ "eventType": "COMPLETE",
+ "run": {
+ "facets": {
+ "airflow": {
+ "task": {
+ "trigger_dag_id":
"openlineage_trigger_dag_deferrable_child2__notrigger",
+ "trigger_run_id": "{{ result.startswith('manual__202')
}}"
+ }
+ },
+ "parent": {
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name":
"openlineage_trigger_dag_deferrable_child__notrigger"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "openlineage_trigger_dag_deferrable",
+ "namespace": "{{ result is string }}"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ }
+ }
+ },
+ "job": {
+ "name":
"openlineage_trigger_dag_deferrable_child__notrigger.trigger_dagrun2"
+ }
+ },
+ {
+ "eventType": "START",
+ "eventTime": "{{ is_datetime(result) }}",
+ "producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunEvent$\")
}}",
+ "inputs": [],
+ "outputs": [],
+ "run": {
+ "runId": "{{ is_uuid(result) }}",
+ "facets": {
+ "airflowDagRun": {
+ "dag": {
+ "dag_id":
"openlineage_trigger_dag_deferrable_child2__notrigger",
+ "fileloc": "{{
result.endswith('openlineage/example_openlineage_trigger_dag_deferrable.py')
}}",
+ "owner": "airflow",
+ "owner_links": {},
+ "start_date": "{{ is_datetime(result) }}",
+ "timetable": {}
+ },
+ "dagRun": {
+ "conf": {
+ "openlineage": {
+ "parentRunId": "{{ is_uuid(result) }}",
+ "parentJobNamespace": "{{ result is string }}",
+ "parentJobName":
"openlineage_trigger_dag_deferrable_child__notrigger.trigger_dagrun2",
+ "rootParentRunId": "{{ is_uuid(result) }}",
+ "rootParentJobNamespace": "{{ result is string
}}",
+ "rootParentJobName":
"openlineage_trigger_dag_deferrable"
+ }
+ },
+ "dag_id":
"openlineage_trigger_dag_deferrable_child2__notrigger",
+ "data_interval_end": "{{ is_datetime(result) }}",
+ "data_interval_start": "{{ is_datetime(result) }}",
+ "logical_date": "{{ is_datetime(result) }}",
+ "run_id": "{{ result.startswith('manual__202') }}",
+ "run_type": "manual",
+ "start_date": "{{ is_datetime(result) }}"
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\")
}}"
+ },
+ "parent": {
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name":
"openlineage_trigger_dag_deferrable_child__notrigger.trigger_dagrun2"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "openlineage_trigger_dag_deferrable",
+ "namespace": "{{ result is string }}"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ },
+ "nominalTime": {
+ "nominalEndTime": "{{ is_datetime(result) }}",
+ "nominalStartTime": "{{ is_datetime(result) }}",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\")
}}"
+ },
+ "processing_engine": {
+ "name": "Airflow",
+ "openlineageAdapterVersion": "{{ regex_match(result,
\"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}",
+ "version": "{{ regex_match(result,
\"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ProcessingEngineRunFacet.json\\#\\/\\$defs\\/ProcessingEngineRunFacet$\")
}}"
+ }
+ }
+ },
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name": "openlineage_trigger_dag_deferrable_child2__notrigger",
+ "facets": {
+ "jobType": {
+ "integration": "AIRFLOW",
+ "jobType": "DAG",
+ "processingType": "BATCH",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/JobTypeJobFacet.json\\#\\/\\$defs\\/JobTypeJobFacet\")
}}"
+ },
+ "ownership": {
+ "owners": [
+ {
+ "name": "airflow"
+ }
+ ],
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\")
}}"
+ },
+ "airflow": {
+ "taskGroups": {},
+ "taskTree": {},
+ "tasks": {
+ "do_nothing_task": {
+ "downstream_task_ids": [],
+ "emits_ol_events": "{{ result == true }}",
+ "is_setup": false,
+ "is_teardown": false,
+ "operator":
"airflow.providers.standard.operators.bash.BashOperator",
+ "ui_label": "do_nothing_task"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/JobFacet\")
}}"
+ }
+ }
+ }
+ },
+ {
+ "eventType": "COMPLETE",
+ "eventTime": "{{ is_datetime(result) }}",
+ "producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunEvent$\")
}}",
+ "inputs": [],
+ "outputs": [],
+ "run": {
+ "runId": "{{ is_uuid(result) }}",
+ "facets": {
+ "airflowState": {
+ "dagRunState": "success",
+ "tasksState": {
+ "do_nothing_task": "success"
+ },
+ "tasksDuration": {
+ "do_nothing_task": "{{ result is number }}"
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\")
}}"
+ },
+ "airflowDagRun": {
+ "dag": {
+ "dag_id":
"openlineage_trigger_dag_deferrable_child2__notrigger",
+ "fileloc": "{{
result.endswith('openlineage/example_openlineage_trigger_dag_deferrable.py')
}}",
+ "owner": "airflow",
+ "owner_links": {},
+ "start_date": "{{ is_datetime(result) }}",
+ "timetable": {}
+ },
+ "dagRun": {
+ "conf": {
+ "openlineage": {
+ "parentRunId": "{{ is_uuid(result) }}",
+ "parentJobNamespace": "{{ result is string }}",
+ "parentJobName":
"openlineage_trigger_dag_deferrable_child__notrigger.trigger_dagrun2",
+ "rootParentRunId": "{{ is_uuid(result) }}",
+ "rootParentJobNamespace": "{{ result is string
}}",
+ "rootParentJobName":
"openlineage_trigger_dag_deferrable"
+ }
+ },
+ "dag_id":
"openlineage_trigger_dag_deferrable_child2__notrigger",
+ "data_interval_end": "{{ is_datetime(result) }}",
+ "data_interval_start": "{{ is_datetime(result) }}",
+ "logical_date": "{{ is_datetime(result) }}",
+ "run_id": "{{ result.startswith('manual__202') }}",
+ "run_type": "manual",
+ "start_date": "{{ is_datetime(result) }}",
+ "duration": "{{ result is number }}"
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/[\\d-]+\\/OpenLineage.json\\#\\/\\$defs\\/RunFacet\")
}}"
+ },
+ "parent": {
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name":
"openlineage_trigger_dag_deferrable_child__notrigger.trigger_dagrun2"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "openlineage_trigger_dag_deferrable",
+ "namespace": "{{ result is string }}"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ },
+ "nominalTime": {
+ "nominalEndTime": "{{ is_datetime(result) }}",
+ "nominalStartTime": "{{ is_datetime(result) }}",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/NominalTimeRunFacet.json\\#\\/\\$defs\\/NominalTimeRunFacet$\")
}}"
+ },
+ "processing_engine": {
+ "name": "Airflow",
+ "openlineageAdapterVersion": "{{ regex_match(result,
\"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}",
+ "version": "{{ regex_match(result,
\"^[\\d]+\\.[\\d]+\\.[\\d]+.*\") }}",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ProcessingEngineRunFacet.json\\#\\/\\$defs\\/ProcessingEngineRunFacet$\")
}}"
+ }
+ }
+ },
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name": "openlineage_trigger_dag_deferrable_child2__notrigger",
+ "facets": {
+ "ownership": {
+ "owners": [
+ {
+ "name": "airflow"
+ }
+ ],
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/OwnershipJobFacet.json\\#\\/\\$defs\\/OwnershipJobFacet\")
}}"
+ },
+ "jobType": {
+ "integration": "AIRFLOW",
+ "jobType": "DAG",
+ "processingType": "BATCH",
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/JobTypeJobFacet.json\\#\\/\\$defs\\/JobTypeJobFacet\")
}}"
+ }
+ }
+ }
+ },
+ {
+ "eventType": "START",
+ "run": {
+ "facets": {
+ "parent": {
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name":
"openlineage_trigger_dag_deferrable_child2__notrigger"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "openlineage_trigger_dag_deferrable",
+ "namespace": "{{ result is string }}"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ }
+ }
+ },
+ "job": {
+ "name":
"openlineage_trigger_dag_deferrable_child2__notrigger.do_nothing_task"
+ }
+ },
+ {
+ "eventType": "COMPLETE",
+ "run": {
+ "facets": {
+ "parent": {
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name":
"openlineage_trigger_dag_deferrable_child2__notrigger"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "openlineage_trigger_dag_deferrable",
+ "namespace": "{{ result is string }}"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ }
+ }
+ },
+ "job": {
+ "name":
"openlineage_trigger_dag_deferrable_child2__notrigger.do_nothing_task"
+ }
+ },
+ {
+ "eventType": "START",
+ "run": {
+ "facets": {
+ "airflow": {
+ "task": {
+ "deferrable": "{{ result == true }}",
+ "trigger_dag_id":
"openlineage_trigger_dag_deferrable_child__notrigger"
+ }
+ },
+ "parent": {
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name": "openlineage_trigger_dag_deferrable"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "openlineage_trigger_dag_deferrable",
+ "namespace": "{{ result is string }}"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ }
+ }
+ },
+ "job": {
+ "name": "openlineage_trigger_dag_deferrable.trigger_dagrun"
+ }
+ },
+ {
+ "eventType": "COMPLETE",
+ "run": {
+ "facets": {
+ "airflow": {
+ "task": {
+ "deferrable": "{{ result == true }}",
+ "trigger_dag_id":
"openlineage_trigger_dag_deferrable_child__notrigger",
+ "trigger_run_id": "{{ result.startswith('manual__') }}"
+ }
+ },
+ "parent": {
+ "job": {
+ "namespace": "{{ result is string }}",
+ "name": "openlineage_trigger_dag_deferrable"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ },
+ "root": {
+ "job": {
+ "name": "openlineage_trigger_dag_deferrable",
+ "namespace": "{{ result is string }}"
+ },
+ "run": {
+ "runId": "{{ is_uuid(result) }}"
+ }
+ },
+ "_producer": "{{ regex_match(result,
\"^https:\\/\\/github.com/apache/airflow/tree/providers-openlineage\\/[\\d]+\\.[\\d]+\\.[\\d]+.*$\")
}}",
+ "_schemaURL": "{{ regex_match(result,
\"^https:\\/\\/openlineage.io\\/spec\\/facets\\/[\\d-]+\\/ParentRunFacet.json\\#\\/\\$defs\\/ParentRunFacet$\")
}}"
+ }
+ }
+ },
+ "job": {
+ "name": "openlineage_trigger_dag_deferrable.trigger_dagrun"
+ }
+ }
+]
diff --git a/providers/openlineage/tests/system/openlineage/operator.py
b/providers/openlineage/tests/system/openlineage/operator.py
index 000696a95b1..0bc8aa7c003 100644
--- a/providers/openlineage/tests/system/openlineage/operator.py
+++ b/providers/openlineage/tests/system/openlineage/operator.py
@@ -188,7 +188,7 @@ class OpenLineageTestOperator(BaseOperator):
:param event_templates: dictionary where key is the key used by
VariableTransport in format of <DAG_ID>.<TASK_ID>.event.<EVENT_TYPE>, and value
is event template (fragment) that need to be in received events.
:param file_path: alternatively, file_path pointing to file with event
templates will be used
:param env: jinja environment used to render event templates
- :param allow_duplicate_events: if set to True, allows multiple events for
the same key
+ :param allow_duplicate_events_regex: regex pattern; keys matching it are
allowed to have multiple events.
:param clear_variables: if set to True, clears only variables to be
checked after all events are checked or if any check fails
:raises: ValueError if the received events do not match with expected ones.
"""
@@ -198,7 +198,7 @@ class OpenLineageTestOperator(BaseOperator):
event_templates: dict[str, dict] | None = None,
file_path: str | None = None,
env: Environment = setup_jinja(),
- allow_duplicate_events: bool = False,
+ allow_duplicate_events_regex: str | None = None,
clear_variables: bool = True,
**kwargs,
):
@@ -206,8 +206,8 @@ class OpenLineageTestOperator(BaseOperator):
self.event_templates = event_templates
self.file_path = file_path
self.env = env
- self.multiple_events = allow_duplicate_events
- self.delete = clear_variables
+ self.allow_duplicate_events_regex = allow_duplicate_events_regex
+ self.clear_variables = clear_variables
if self.event_templates and self.file_path:
raise ValueError("Can't pass both event_templates and file_path")
@@ -234,13 +234,15 @@ class OpenLineageTestOperator(BaseOperator):
)
if len(actual_events) == 0:
raise ValueError(f"No event for key {key}")
- if len(actual_events) != 1 and not self.multiple_events:
- raise ValueError(f"Expected one event for key {key}, got
{len(actual_events)}")
+ if len(actual_events) != 1:
+ regex = self.allow_duplicate_events_regex
+ if regex is None or not re.fullmatch(regex, key):
+ raise ValueError(f"Expected one event for key {key},
got {len(actual_events)}")
# Last event is checked against the template, this will allow
to f.e. check change in try_num
if not match(template, json.loads(actual_events[-1]),
self.env):
raise ValueError("Event received does not match one
specified in test")
finally:
- if self.delete:
+ if self.clear_variables:
for key in self.event_templates: # type: ignore[union-attr]
log.info("Removing variable `%s`", key)
Variable.delete(key=key)