Repository: incubator-airflow Updated Branches: refs/heads/master cd332761c -> 93538cae9
[AIRFLOW-685] Add test for MySqlHook.bulk_load() Closes #1929 from sekikn/AIRFLOW-685 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/93538cae Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/93538cae Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/93538cae Branch: refs/heads/master Commit: 93538cae9fb804a113e8493d67d9bb94edbe87fb Parents: cd33276 Author: Kengo Seki <[email protected]> Authored: Sun Dec 25 14:32:42 2016 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Sun Dec 25 14:32:45 2016 +0100 ---------------------------------------------------------------------- airflow/utils/db.py | 2 +- tests/operators/operators.py | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/93538cae/airflow/utils/db.py ---------------------------------------------------------------------- diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 0d6f3d5..9c7b4b3 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -114,7 +114,7 @@ def initdb(): merge_conn( models.Connection( conn_id='airflow_ci', conn_type='mysql', - host='localhost', login='root', + host='localhost', login='root', extra="{\"local_infile\": true}", schema='airflow_ci')) merge_conn( models.Connection( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/93538cae/tests/operators/operators.py ---------------------------------------------------------------------- diff --git a/tests/operators/operators.py b/tests/operators/operators.py index 60e7df4..7458827 100644 --- a/tests/operators/operators.py +++ b/tests/operators/operators.py @@ -71,6 +71,28 @@ class MySqlTest(unittest.TestCase): sql=sql, dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + def mysql_hook_test_bulk_load(self): + records = ("foo", "bar", "baz") + + import tempfile + with tempfile.NamedTemporaryFile() as t: + t.write("\n".join(records).encode('utf8')) + t.flush() + + from airflow.hooks.mysql_hook import MySqlHook + h = MySqlHook('airflow_ci') + with h.get_conn() as c: + c.execute(""" + CREATE TABLE IF NOT EXISTS test_airflow ( + dummy VARCHAR(50) + ) + """) + c.execute("TRUNCATE TABLE test_airflow") + h.bulk_load("test_airflow", t.name) + c.execute("SELECT dummy FROM test_airflow") + results = tuple(result[0] for result in c.fetchall()) + assert sorted(records) == sorted(results) + def test_mysql_to_mysql(self): sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;" import airflow.operators.generic_transfer
