This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 00db98b5fe fix: cast list to flattened string in openlineage
InfoJsonEncodable (#41786)
00db98b5fe is described below
commit 00db98b5fea9c6341972d07b9644ac7e563789c1
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Mon Sep 2 13:26:36 2024 +0200
fix: cast list to flattened string in openlineage InfoJsonEncodable (#41786)
Signed-off-by: Maciej Obuchowski <[email protected]>
---
airflow/providers/openlineage/utils/utils.py | 2 +-
.../providers/openlineage/plugins/test_adapter.py | 99 +++++++++++-----------
tests/providers/openlineage/plugins/test_utils.py | 4 +-
tests/providers/openlineage/utils/test_utils.py | 2 +-
4 files changed, 52 insertions(+), 55 deletions(-)
diff --git a/airflow/providers/openlineage/utils/utils.py
b/airflow/providers/openlineage/utils/utils.py
index 316666f2c0..ec58c6e2d7 100644
--- a/airflow/providers/openlineage/utils/utils.py
+++ b/airflow/providers/openlineage/utils/utils.py
@@ -199,7 +199,7 @@ class InfoJsonEncodable(dict):
return value.isoformat()
if isinstance(value, datetime.timedelta):
return f"{value.total_seconds()} seconds"
- if isinstance(value, (set, tuple)):
+ if isinstance(value, (set, list, tuple)):
return str(list(value))
return value
diff --git a/tests/providers/openlineage/plugins/test_adapter.py
b/tests/providers/openlineage/plugins/test_adapter.py
index 19bba5fffb..18f457c5be 100644
--- a/tests/providers/openlineage/plugins/test_adapter.py
+++ b/tests/providers/openlineage/plugins/test_adapter.py
@@ -528,7 +528,7 @@ def
test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta
mock_stats_timer.assert_called_with("ol.emit.attempts")
[email protected]("airflow.providers.openlineage.conf.debug_mode", return_value=True)
[email protected]("airflow.providers.openlineage.conf.debug_mode",
return_value=False)
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
@@ -578,7 +578,7 @@ def test_emit_dag_started_event(mock_stats_incr,
mock_stats_timer, generate_stat
"description": "dag desc",
"owner": "airflow",
"start_date": "2024-06-01T00:00:00+00:00",
- "tags": [],
+ "tags": "[]",
"fileloc": pathlib.Path(__file__).resolve().as_posix(),
}
if hasattr(dag, "schedule_interval"): # Airflow 2 compat.
@@ -587,56 +587,53 @@ def test_emit_dag_started_event(mock_stats_incr,
mock_stats_timer, generate_stat
expected_dag_info["timetable_summary"] = "1 day, 0:00:00"
assert len(client.emit.mock_calls) == 1
- assert (
- call(
- RunEvent(
- eventType=RunState.START,
- eventTime=event_time.isoformat(),
- run=Run(
- runId=random_uuid,
- facets={
- "nominalTime": nominal_time_run.NominalTimeRunFacet(
- nominalStartTime=event_time.isoformat(),
- nominalEndTime=event_time.isoformat(),
- ),
- "airflowDagRun": AirflowDagRunFacet(
- dag=expected_dag_info,
- dagRun={
- "conf": {},
- "dag_id": "dag_id",
- "data_interval_start": event_time.isoformat(),
- "data_interval_end": event_time.isoformat(),
- "external_trigger": None,
- "run_id": run_id,
- "run_type": None,
- "start_date": event_time.isoformat(),
- },
- ),
- "debug": AirflowDebugRunFacet(packages=ANY),
- },
- ),
- job=Job(
- namespace=namespace(),
- name="dag_id",
- facets={
- "documentation":
documentation_job.DocumentationJobFacet(description="dag desc"),
- "ownership": ownership_job.OwnershipJobFacet(
- owners=[
- ownership_job.Owner(name="airflow", type=None),
- ]
- ),
- **job_facets,
- "jobType": job_type_job.JobTypeJobFacet(
- processingType="BATCH", integration="AIRFLOW",
jobType="DAG"
- ),
- },
- ),
- producer=_PRODUCER,
- inputs=[],
- outputs=[],
- )
+ client.emit.assert_called_once_with(
+ RunEvent(
+ eventType=RunState.START,
+ eventTime=event_time.isoformat(),
+ run=Run(
+ runId=random_uuid,
+ facets={
+ "nominalTime": nominal_time_run.NominalTimeRunFacet(
+ nominalStartTime=event_time.isoformat(),
+ nominalEndTime=event_time.isoformat(),
+ ),
+ "airflowDagRun": AirflowDagRunFacet(
+ dag=expected_dag_info,
+ dagRun={
+ "conf": {},
+ "dag_id": "dag_id",
+ "data_interval_start": event_time.isoformat(),
+ "data_interval_end": event_time.isoformat(),
+ "external_trigger": None,
+ "run_id": run_id,
+ "run_type": None,
+ "start_date": event_time.isoformat(),
+ },
+ ),
+ # "debug": AirflowDebugRunFacet(packages=ANY),
+ },
+ ),
+ job=Job(
+ namespace=namespace(),
+ name="dag_id",
+ facets={
+ "documentation":
documentation_job.DocumentationJobFacet(description="dag desc"),
+ "ownership": ownership_job.OwnershipJobFacet(
+ owners=[
+ ownership_job.Owner(name="airflow", type=None),
+ ]
+ ),
+ **job_facets,
+ "jobType": job_type_job.JobTypeJobFacet(
+ processingType="BATCH", integration="AIRFLOW",
jobType="DAG"
+ ),
+ },
+ ),
+ producer=_PRODUCER,
+ inputs=[],
+ outputs=[],
)
- in client.emit.mock_calls
)
mock_stats_incr.assert_not_called()
diff --git a/tests/providers/openlineage/plugins/test_utils.py
b/tests/providers/openlineage/plugins/test_utils.py
index 0be62a5b0c..382f806f23 100644
--- a/tests/providers/openlineage/plugins/test_utils.py
+++ b/tests/providers/openlineage/plugins/test_utils.py
@@ -164,7 +164,7 @@ def test_info_json_encodable_without_slots():
}
-def test_info_json_encodable_list_does_not_flatten():
+def test_info_json_encodable_list_does_flatten():
class TestInfo(InfoJsonEncodable):
includes = ["alist"]
@@ -174,7 +174,7 @@ def test_info_json_encodable_list_does_not_flatten():
obj = Test(["a", "b", "c"])
- assert json.loads(json.dumps(TestInfo(obj))) == {"alist": ["a", "b", "c"]}
+ assert json.loads(json.dumps(TestInfo(obj))) == {"alist": "['a', 'b',
'c']"}
def test_info_json_encodable_list_does_include_nonexisting():
diff --git a/tests/providers/openlineage/utils/test_utils.py
b/tests/providers/openlineage/utils/test_utils.py
index d340d39e17..d97a447e99 100644
--- a/tests/providers/openlineage/utils/test_utils.py
+++ b/tests/providers/openlineage/utils/test_utils.py
@@ -142,7 +142,7 @@ def test_get_airflow_dag_run_facet():
"owner": "airflow",
"timetable": {},
"start_date": "2024-06-01T00:00:00+00:00",
- "tags": ["test"],
+ "tags": "['test']",
}
if hasattr(dag, "schedule_interval"): # Airflow 2 compat.
expected_dag_info["schedule_interval"] = "@once"