This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a50c8c406 Refactor: use tmp_path in postgres test (#33545)
6a50c8c406 is described below

commit 6a50c8c406d74f5da571e3d1fcc229791cd65684
Author: Miroslav Šedivý <[email protected]>
AuthorDate: Sun Aug 20 21:55:51 2023 +0000

    Refactor: use tmp_path in postgres test (#33545)
---
 tests/providers/postgres/hooks/test_postgres.py | 42 +++++++++++--------------
 1 file changed, 19 insertions(+), 23 deletions(-)

diff --git a/tests/providers/postgres/hooks/test_postgres.py 
b/tests/providers/postgres/hooks/test_postgres.py
index d90b0def66..53351da69e 100644
--- a/tests/providers/postgres/hooks/test_postgres.py
+++ b/tests/providers/postgres/hooks/test_postgres.py
@@ -18,7 +18,7 @@
 from __future__ import annotations
 
 import json
-from tempfile import NamedTemporaryFile
+import os
 from unittest import mock
 
 import psycopg2.extras
@@ -357,40 +357,36 @@ class TestPostgresHook:
             self.cur.copy_expert.assert_called_once_with(statement, 
open_mock.return_value)
             assert open_mock.call_args.args == (filename, "r+")
 
-    def test_bulk_load(self):
+    def test_bulk_load(self, tmp_path):
         hook = PostgresHook()
         input_data = ["foo", "bar", "baz"]
 
-        with hook.get_conn() as conn:
-            with conn.cursor() as cur:
-                cur.execute(f"CREATE TABLE {self.table} (c VARCHAR)")
-                conn.commit()
+        with hook.get_conn() as conn, conn.cursor() as cur:
+            cur.execute(f"CREATE TABLE {self.table} (c VARCHAR)")
+            conn.commit()
 
-                with NamedTemporaryFile() as f:
-                    f.write("\n".join(input_data).encode("utf-8"))
-                    f.flush()
-                    hook.bulk_load(self.table, f.name)
+            path = tmp_path / "testfile"
+            path.write_text("\n".join(input_data))
+            hook.bulk_load(self.table, os.fspath(path))
 
-                cur.execute(f"SELECT * FROM {self.table}")
-                results = [row[0] for row in cur.fetchall()]
+            cur.execute(f"SELECT * FROM {self.table}")
+            results = [row[0] for row in cur.fetchall()]
 
         assert sorted(input_data) == sorted(results)
 
-    def test_bulk_dump(self):
+    def test_bulk_dump(self, tmp_path):
         hook = PostgresHook()
         input_data = ["foo", "bar", "baz"]
 
-        with hook.get_conn() as conn:
-            with conn.cursor() as cur:
-                cur.execute(f"CREATE TABLE {self.table} (c VARCHAR)")
-                values = ",".join(f"('{data}')" for data in input_data)
-                cur.execute(f"INSERT INTO {self.table} VALUES {values}")
-                conn.commit()
+        with hook.get_conn() as conn, conn.cursor() as cur:
+            cur.execute(f"CREATE TABLE {self.table} (c VARCHAR)")
+            values = ",".join(f"('{data}')" for data in input_data)
+            cur.execute(f"INSERT INTO {self.table} VALUES {values}")
+            conn.commit()
 
-                with NamedTemporaryFile() as f:
-                    hook.bulk_dump(self.table, f.name)
-                    f.seek(0)
-                    results = [line.rstrip().decode("utf-8") for line in 
f.readlines()]
+        path = tmp_path / "testfile"
+        hook.bulk_dump(self.table, os.fspath(path))
+        results = [line.rstrip() for line in path.read_text().splitlines()]
 
         assert sorted(input_data) == sorted(results)
 

Reply via email to