This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6a50c8c406 Refactor: use tmp_path in postgres test (#33545)
6a50c8c406 is described below
commit 6a50c8c406d74f5da571e3d1fcc229791cd65684
Author: Miroslav Šedivý <[email protected]>
AuthorDate: Sun Aug 20 21:55:51 2023 +0000
Refactor: use tmp_path in postgres test (#33545)
---
tests/providers/postgres/hooks/test_postgres.py | 42 +++++++++++--------------
1 file changed, 19 insertions(+), 23 deletions(-)
diff --git a/tests/providers/postgres/hooks/test_postgres.py
b/tests/providers/postgres/hooks/test_postgres.py
index d90b0def66..53351da69e 100644
--- a/tests/providers/postgres/hooks/test_postgres.py
+++ b/tests/providers/postgres/hooks/test_postgres.py
@@ -18,7 +18,7 @@
from __future__ import annotations
import json
-from tempfile import NamedTemporaryFile
+import os
from unittest import mock
import psycopg2.extras
@@ -357,40 +357,36 @@ class TestPostgresHook:
self.cur.copy_expert.assert_called_once_with(statement,
open_mock.return_value)
assert open_mock.call_args.args == (filename, "r+")
- def test_bulk_load(self):
+ def test_bulk_load(self, tmp_path):
hook = PostgresHook()
input_data = ["foo", "bar", "baz"]
- with hook.get_conn() as conn:
- with conn.cursor() as cur:
- cur.execute(f"CREATE TABLE {self.table} (c VARCHAR)")
- conn.commit()
+ with hook.get_conn() as conn, conn.cursor() as cur:
+ cur.execute(f"CREATE TABLE {self.table} (c VARCHAR)")
+ conn.commit()
- with NamedTemporaryFile() as f:
- f.write("\n".join(input_data).encode("utf-8"))
- f.flush()
- hook.bulk_load(self.table, f.name)
+ path = tmp_path / "testfile"
+ path.write_text("\n".join(input_data))
+ hook.bulk_load(self.table, os.fspath(path))
- cur.execute(f"SELECT * FROM {self.table}")
- results = [row[0] for row in cur.fetchall()]
+ cur.execute(f"SELECT * FROM {self.table}")
+ results = [row[0] for row in cur.fetchall()]
assert sorted(input_data) == sorted(results)
- def test_bulk_dump(self):
+ def test_bulk_dump(self, tmp_path):
hook = PostgresHook()
input_data = ["foo", "bar", "baz"]
- with hook.get_conn() as conn:
- with conn.cursor() as cur:
- cur.execute(f"CREATE TABLE {self.table} (c VARCHAR)")
- values = ",".join(f"('{data}')" for data in input_data)
- cur.execute(f"INSERT INTO {self.table} VALUES {values}")
- conn.commit()
+ with hook.get_conn() as conn, conn.cursor() as cur:
+ cur.execute(f"CREATE TABLE {self.table} (c VARCHAR)")
+ values = ",".join(f"('{data}')" for data in input_data)
+ cur.execute(f"INSERT INTO {self.table} VALUES {values}")
+ conn.commit()
- with NamedTemporaryFile() as f:
- hook.bulk_dump(self.table, f.name)
- f.seek(0)
- results = [line.rstrip().decode("utf-8") for line in
f.readlines()]
+ path = tmp_path / "testfile"
+ hook.bulk_dump(self.table, os.fspath(path))
+ results = [line.rstrip() for line in path.read_text().splitlines()]
assert sorted(input_data) == sorted(results)