Repository: incubator-airflow Updated Branches: refs/heads/v1-8-test 1ea4c533f -> 76cbfeb3a
[AIRFLOW-139] Let psycopg2 handle autocommit for PostgresHook The server-side autocommit setting was removed and reimplemented in client applications and languages. Server-side autocommit was causing too many problems with languages and applications that wanted to control their own autocommit behavior, so autocommit was removed from the server and added to individual client APIs as appropriate Closes #1821 from danielzohar/AIRFLOW- 139_vacuum_operator Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ac9167f3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ac9167f3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ac9167f3 Branch: refs/heads/v1-8-test Commit: ac9167f37b586f9ece381763b91a0ee25d736f38 Parents: a2b0ea3 Author: Daniel Zohar <[email protected]> Authored: Tue Jan 24 15:45:39 2017 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Tue Jan 24 15:45:48 2017 +0100 ---------------------------------------------------------------------- airflow/hooks/postgres_hook.py | 4 +--- tests/operators/operators.py | 24 ++++++++++++++++++------ 2 files changed, 19 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ac9167f3/airflow/hooks/postgres_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/postgres_hook.py b/airflow/hooks/postgres_hook.py index d096d75..75c8226 100644 --- a/airflow/hooks/postgres_hook.py +++ b/airflow/hooks/postgres_hook.py @@ -26,7 +26,7 @@ class PostgresHook(DbApiHook): ''' conn_name_attr = 'postgres_conn_id' default_conn_name = 'postgres_default' - supports_autocommit = False + supports_autocommit = True def get_conn(self): conn = self.get_connection(self.postgres_conn_id) @@ -41,8 +41,6 @@ class PostgresHook(DbApiHook): if arg_name in ['sslmode', 'sslcert', 'sslkey', 'sslrootcert', 'sslcrl', 'application_name']: conn_args[arg_name] = arg_val psycopg2_conn = psycopg2.connect(**conn_args) - if psycopg2_conn.server_version < 70400: - self.supports_autocommit = True return psycopg2_conn @staticmethod http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ac9167f3/tests/operators/operators.py ---------------------------------------------------------------------- diff --git a/tests/operators/operators.py b/tests/operators/operators.py index 7458827..7aaf12e 100644 --- a/tests/operators/operators.py +++ b/tests/operators/operators.py @@ -15,18 +15,14 @@ from __future__ import print_function import datetime -import os -import unittest -import six -from airflow import DAG, configuration, operators, utils +from airflow import DAG, configuration, operators from airflow.utils.tests import skipUnlessImported + configuration.load_test_config() -import os import unittest - DEFAULT_DATE = datetime.datetime(2015, 1, 1) DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat() DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10] @@ -118,6 +114,7 @@ class MySqlTest(unittest.TestCase): dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + @skipUnlessImported('airflow.operators.postgres_operator', 'PostgresOperator') class PostgresTest(unittest.TestCase): def setUp(self): @@ -182,6 +179,21 @@ class PostgresTest(unittest.TestCase): dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + def test_vacuum(self): + """ + Verifies the VACUUM operation runs well with the PostgresOperator + """ + import airflow.operators.postgres_operator + + sql = "VACUUM ANALYZE;" + t = operators.postgres_operator.PostgresOperator( + task_id='postgres_operator_test_vacuum', + sql=sql, + dag=self.dag, + autocommit=True) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + @skipUnlessImported('airflow.operators.hive_operator', 'HiveOperator') @skipUnlessImported('airflow.operators.postgres_operator', 'PostgresOperator') class TransferTests(unittest.TestCase):
