This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 00db98b5fe fix: cast list to flattened string in openlineage 
InfoJsonEncodable (#41786)
00db98b5fe is described below

commit 00db98b5fea9c6341972d07b9644ac7e563789c1
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Mon Sep 2 13:26:36 2024 +0200

    fix: cast list to flattened string in openlineage InfoJsonEncodable (#41786)
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
---
 airflow/providers/openlineage/utils/utils.py       |  2 +-
 .../providers/openlineage/plugins/test_adapter.py  | 99 +++++++++++-----------
 tests/providers/openlineage/plugins/test_utils.py  |  4 +-
 tests/providers/openlineage/utils/test_utils.py    |  2 +-
 4 files changed, 52 insertions(+), 55 deletions(-)

diff --git a/airflow/providers/openlineage/utils/utils.py 
b/airflow/providers/openlineage/utils/utils.py
index 316666f2c0..ec58c6e2d7 100644
--- a/airflow/providers/openlineage/utils/utils.py
+++ b/airflow/providers/openlineage/utils/utils.py
@@ -199,7 +199,7 @@ class InfoJsonEncodable(dict):
             return value.isoformat()
         if isinstance(value, datetime.timedelta):
             return f"{value.total_seconds()} seconds"
-        if isinstance(value, (set, tuple)):
+        if isinstance(value, (set, list, tuple)):
             return str(list(value))
         return value
 
diff --git a/tests/providers/openlineage/plugins/test_adapter.py 
b/tests/providers/openlineage/plugins/test_adapter.py
index 19bba5fffb..18f457c5be 100644
--- a/tests/providers/openlineage/plugins/test_adapter.py
+++ b/tests/providers/openlineage/plugins/test_adapter.py
@@ -528,7 +528,7 @@ def 
test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta
     mock_stats_timer.assert_called_with("ol.emit.attempts")
 
 
[email protected]("airflow.providers.openlineage.conf.debug_mode", return_value=True)
[email protected]("airflow.providers.openlineage.conf.debug_mode", 
return_value=False)
 
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
 @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
@@ -578,7 +578,7 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
         "description": "dag desc",
         "owner": "airflow",
         "start_date": "2024-06-01T00:00:00+00:00",
-        "tags": [],
+        "tags": "[]",
         "fileloc": pathlib.Path(__file__).resolve().as_posix(),
     }
     if hasattr(dag, "schedule_interval"):  # Airflow 2 compat.
@@ -587,56 +587,53 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
         expected_dag_info["timetable_summary"] = "1 day, 0:00:00"
 
     assert len(client.emit.mock_calls) == 1
-    assert (
-        call(
-            RunEvent(
-                eventType=RunState.START,
-                eventTime=event_time.isoformat(),
-                run=Run(
-                    runId=random_uuid,
-                    facets={
-                        "nominalTime": nominal_time_run.NominalTimeRunFacet(
-                            nominalStartTime=event_time.isoformat(),
-                            nominalEndTime=event_time.isoformat(),
-                        ),
-                        "airflowDagRun": AirflowDagRunFacet(
-                            dag=expected_dag_info,
-                            dagRun={
-                                "conf": {},
-                                "dag_id": "dag_id",
-                                "data_interval_start": event_time.isoformat(),
-                                "data_interval_end": event_time.isoformat(),
-                                "external_trigger": None,
-                                "run_id": run_id,
-                                "run_type": None,
-                                "start_date": event_time.isoformat(),
-                            },
-                        ),
-                        "debug": AirflowDebugRunFacet(packages=ANY),
-                    },
-                ),
-                job=Job(
-                    namespace=namespace(),
-                    name="dag_id",
-                    facets={
-                        "documentation": 
documentation_job.DocumentationJobFacet(description="dag desc"),
-                        "ownership": ownership_job.OwnershipJobFacet(
-                            owners=[
-                                ownership_job.Owner(name="airflow", type=None),
-                            ]
-                        ),
-                        **job_facets,
-                        "jobType": job_type_job.JobTypeJobFacet(
-                            processingType="BATCH", integration="AIRFLOW", 
jobType="DAG"
-                        ),
-                    },
-                ),
-                producer=_PRODUCER,
-                inputs=[],
-                outputs=[],
-            )
+    client.emit.assert_called_once_with(
+        RunEvent(
+            eventType=RunState.START,
+            eventTime=event_time.isoformat(),
+            run=Run(
+                runId=random_uuid,
+                facets={
+                    "nominalTime": nominal_time_run.NominalTimeRunFacet(
+                        nominalStartTime=event_time.isoformat(),
+                        nominalEndTime=event_time.isoformat(),
+                    ),
+                    "airflowDagRun": AirflowDagRunFacet(
+                        dag=expected_dag_info,
+                        dagRun={
+                            "conf": {},
+                            "dag_id": "dag_id",
+                            "data_interval_start": event_time.isoformat(),
+                            "data_interval_end": event_time.isoformat(),
+                            "external_trigger": None,
+                            "run_id": run_id,
+                            "run_type": None,
+                            "start_date": event_time.isoformat(),
+                        },
+                    ),
+                    # "debug": AirflowDebugRunFacet(packages=ANY),
+                },
+            ),
+            job=Job(
+                namespace=namespace(),
+                name="dag_id",
+                facets={
+                    "documentation": 
documentation_job.DocumentationJobFacet(description="dag desc"),
+                    "ownership": ownership_job.OwnershipJobFacet(
+                        owners=[
+                            ownership_job.Owner(name="airflow", type=None),
+                        ]
+                    ),
+                    **job_facets,
+                    "jobType": job_type_job.JobTypeJobFacet(
+                        processingType="BATCH", integration="AIRFLOW", 
jobType="DAG"
+                    ),
+                },
+            ),
+            producer=_PRODUCER,
+            inputs=[],
+            outputs=[],
         )
-        in client.emit.mock_calls
     )
 
     mock_stats_incr.assert_not_called()
diff --git a/tests/providers/openlineage/plugins/test_utils.py 
b/tests/providers/openlineage/plugins/test_utils.py
index 0be62a5b0c..382f806f23 100644
--- a/tests/providers/openlineage/plugins/test_utils.py
+++ b/tests/providers/openlineage/plugins/test_utils.py
@@ -164,7 +164,7 @@ def test_info_json_encodable_without_slots():
     }
 
 
-def test_info_json_encodable_list_does_not_flatten():
+def test_info_json_encodable_list_does_flatten():
     class TestInfo(InfoJsonEncodable):
         includes = ["alist"]
 
@@ -174,7 +174,7 @@ def test_info_json_encodable_list_does_not_flatten():
 
     obj = Test(["a", "b", "c"])
 
-    assert json.loads(json.dumps(TestInfo(obj))) == {"alist": ["a", "b", "c"]}
+    assert json.loads(json.dumps(TestInfo(obj))) == {"alist": "['a', 'b', 
'c']"}
 
 
 def test_info_json_encodable_list_does_include_nonexisting():
diff --git a/tests/providers/openlineage/utils/test_utils.py 
b/tests/providers/openlineage/utils/test_utils.py
index d340d39e17..d97a447e99 100644
--- a/tests/providers/openlineage/utils/test_utils.py
+++ b/tests/providers/openlineage/utils/test_utils.py
@@ -142,7 +142,7 @@ def test_get_airflow_dag_run_facet():
         "owner": "airflow",
         "timetable": {},
         "start_date": "2024-06-01T00:00:00+00:00",
-        "tags": ["test"],
+        "tags": "['test']",
     }
     if hasattr(dag, "schedule_interval"):  # Airflow 2 compat.
         expected_dag_info["schedule_interval"] = "@once"

Reply via email to