Repository: incubator-airflow
Updated Branches:
  refs/heads/master cd332761c -> 93538cae9


[AIRFLOW-685] Add test for MySqlHook.bulk_load()

Closes #1929 from sekikn/AIRFLOW-685


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/93538cae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/93538cae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/93538cae

Branch: refs/heads/master
Commit: 93538cae9fb804a113e8493d67d9bb94edbe87fb
Parents: cd33276
Author: Kengo Seki <[email protected]>
Authored: Sun Dec 25 14:32:42 2016 +0100
Committer: Bolke de Bruin <[email protected]>
Committed: Sun Dec 25 14:32:45 2016 +0100

----------------------------------------------------------------------
 airflow/utils/db.py          |  2 +-
 tests/operators/operators.py | 22 ++++++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/93538cae/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 0d6f3d5..9c7b4b3 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -114,7 +114,7 @@ def initdb():
     merge_conn(
         models.Connection(
             conn_id='airflow_ci', conn_type='mysql',
-            host='localhost', login='root',
+            host='localhost', login='root', extra="{\"local_infile\": true}",
             schema='airflow_ci'))
     merge_conn(
         models.Connection(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/93538cae/tests/operators/operators.py
----------------------------------------------------------------------
diff --git a/tests/operators/operators.py b/tests/operators/operators.py
index 60e7df4..7458827 100644
--- a/tests/operators/operators.py
+++ b/tests/operators/operators.py
@@ -71,6 +71,28 @@ class MySqlTest(unittest.TestCase):
             sql=sql, dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
 
+    def mysql_hook_test_bulk_load(self):
+        records = ("foo", "bar", "baz")
+
+        import tempfile
+        with tempfile.NamedTemporaryFile() as t:
+            t.write("\n".join(records).encode('utf8'))
+            t.flush()
+
+            from airflow.hooks.mysql_hook import MySqlHook
+            h = MySqlHook('airflow_ci')
+            with h.get_conn() as c:
+                c.execute("""
+                    CREATE TABLE IF NOT EXISTS test_airflow (
+                        dummy VARCHAR(50)
+                    )
+                """)
+                c.execute("TRUNCATE TABLE test_airflow")
+                h.bulk_load("test_airflow", t.name)
+                c.execute("SELECT dummy FROM test_airflow")
+                results = tuple(result[0] for result in c.fetchall())
+                assert sorted(records) == sorted(results)
+
     def test_mysql_to_mysql(self):
         sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;"
         import airflow.operators.generic_transfer

Reply via email to