Repository: incubator-airflow Updated Branches: refs/heads/master 2f107d8a3 -> 6372770be
[AIRFLOW-1726] Add copy_expert psycopg2 method to PostgresHook Executes SQL using psycopg2 copy_expert method Necessary to execute COPY command without access to a superuser Closes #2698 from andyxhadji/AIRFLOW-1726 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6372770b Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6372770b Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6372770b Branch: refs/heads/master Commit: 6372770be6aab67654cede81d6b027a838077a8a Parents: 2f107d8 Author: Andy Hadjigeorgiou <[email protected]> Authored: Thu Oct 19 20:50:33 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Thu Oct 19 20:50:52 2017 +0200 ---------------------------------------------------------------------- airflow/hooks/postgres_hook.py | 11 +++++++ tests/hooks/test_postgres_hook.py | 54 ++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6372770b/airflow/hooks/postgres_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/postgres_hook.py b/airflow/hooks/postgres_hook.py index e47f8e3..81e10d7 100644 --- a/airflow/hooks/postgres_hook.py +++ b/airflow/hooks/postgres_hook.py @@ -14,6 +14,7 @@ import psycopg2 import psycopg2.extensions +from contextlib import closing from airflow.hooks.dbapi_hook import DbApiHook @@ -53,6 +54,16 @@ class PostgresHook(DbApiHook): psycopg2_conn = psycopg2.connect(**conn_args) return psycopg2_conn + def copy_expert(self, sql, filename, open=open): + ''' + Executes SQL using psycopg2 copy_expert method + Necessary to execute COPY command without access to a superuser + ''' + f = open(filename, 'w') + with closing(self.get_conn()) as conn: + with closing(conn.cursor()) as cur: + cur.copy_expert(sql, f) + @staticmethod def _serialize_cell(cell, conn): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6372770b/tests/hooks/test_postgres_hook.py ---------------------------------------------------------------------- diff --git a/tests/hooks/test_postgres_hook.py b/tests/hooks/test_postgres_hook.py new file mode 100644 index 0000000..41bb13e --- /dev/null +++ b/tests/hooks/test_postgres_hook.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import mock +import unittest + +from airflow.hooks.postgres_hook import PostgresHook + + +class TestPostgresHook(unittest.TestCase): + + def setUp(self): + super(TestPostgresHook, self).setUp() + + self.cur = mock.MagicMock() + self.conn = conn = mock.MagicMock() + self.conn.cursor.return_value = self.cur + + class UnitTestPostgresHook(PostgresHook): + conn_name_attr = 'test_conn_id' + + def get_conn(self): + return conn + + self.db_hook = UnitTestPostgresHook() + + def test_copy_expert(self): + m = mock.mock_open(read_data='{"some": "json"}') + with mock.patch('airflow.hooks.postgres_hook.open', m, create=True) as m: + statement = "SQL" + filename = "filename" + + self.cur.fetchall.return_value = None + f = m(filename, 'w') + def test_open(filename, mode): + return f + + self.assertEqual(None, self.db_hook.copy_expert(statement, filename, open=test_open)) + + self.conn.close.assert_called_once() + self.cur.close.assert_called_once() + self.cur.copy_expert.assert_called_once_with(statement, f)
