Repository: incubator-airflow
Updated Branches:
  refs/heads/master 29dbedfd0 -> b7dc31510


[AIRFLOW-2500] Fix MySqlToHiveTransfer to transfer unsigned type properly

MySQL supports unsigned data types, but Hive
doesn't.
So if MySqlToHiveTransfer maps MySQL's data types
to
Hive's corresponding ones directly (e.g. INT ->
INT),
unsigned values over signed type's upper bound
transferred from MySQL are interpreted as invalid
by Hive, and users get NULL.
To avoid it, this PR fixes MySqlToHiveTransfer
to map MySQL data types to Hive's wider ones
(e.g. SMALLINT -> INT, INT -> BIGINT, etc.).

Closes #3446 from sekikn/AIRFLOW-2500


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b7dc3151
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b7dc3151
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b7dc3151

Branch: refs/heads/master
Commit: b7dc315101a0fc924f76e5c5f500c96e85bdd672
Parents: 29dbedf
Author: Kengo Seki <[email protected]>
Authored: Sun Jun 3 15:24:19 2018 +0200
Committer: Fokko Driesprong <[email protected]>
Committed: Sun Jun 3 15:24:19 2018 +0200

----------------------------------------------------------------------
 airflow/operators/mysql_to_hive.py |   5 +-
 tests/operators/operators.py       | 112 ++++++++++++++++++++++++++++++++
 2 files changed, 115 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b7dc3151/airflow/operators/mysql_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mysql_to_hive.py 
b/airflow/operators/mysql_to_hive.py
index 22b7ac2..ab7b7fa 100644
--- a/airflow/operators/mysql_to_hive.py
+++ b/airflow/operators/mysql_to_hive.py
@@ -104,9 +104,10 @@ class MySqlToHiveTransfer(BaseOperator):
             t.DOUBLE: 'DOUBLE',
             t.FLOAT: 'DOUBLE',
             t.INT24: 'INT',
-            t.LONG: 'INT',
-            t.LONGLONG: 'BIGINT',
+            t.LONG: 'BIGINT',
+            t.LONGLONG: 'DECIMAL(38,0)',
             t.SHORT: 'INT',
+            t.TINY: 'SMALLINT',
             t.YEAR: 'INT',
         }
         return d[mysql_type] if mysql_type in d else 'STRING'

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b7dc3151/tests/operators/operators.py
----------------------------------------------------------------------
diff --git a/tests/operators/operators.py b/tests/operators/operators.py
index 4eba0a8..07b1396 100644
--- a/tests/operators/operators.py
+++ b/tests/operators/operators.py
@@ -23,8 +23,11 @@ from airflow import DAG, configuration, operators
 from airflow.utils.tests import skipUnlessImported
 from airflow.utils import timezone
 
+from collections import OrderedDict
+
 import os
 import mock
+import six
 import unittest
 
 configuration.load_test_config()
@@ -313,3 +316,112 @@ class TransferTests(unittest.TestCase):
             tblproperties={'test_property':'test_value'},
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
+
+    @mock.patch('airflow.hooks.hive_hooks.HiveCliHook.load_file')
+    def test_mysql_to_hive_type_conversion(self, mock_load_file):
+        mysql_conn_id = 'airflow_ci'
+        mysql_table = 'test_mysql_to_hive'
+
+        from airflow.hooks.mysql_hook import MySqlHook
+        m = MySqlHook(mysql_conn_id)
+
+        try:
+            with m.get_conn() as c:
+                c.execute("DROP TABLE IF EXISTS {}".format(mysql_table))
+                c.execute("""
+                    CREATE TABLE {} (
+                        c0 TINYINT,
+                        c1 SMALLINT,
+                        c2 MEDIUMINT,
+                        c3 INT,
+                        c4 BIGINT
+                    )
+                """.format(mysql_table))
+
+            from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
+            t = MySqlToHiveTransfer(
+                task_id='test_m2h',
+                mysql_conn_id=mysql_conn_id,
+                hive_cli_conn_id='beeline_default',
+                sql="SELECT * FROM {}".format(mysql_table),
+                hive_table='test_mysql_to_hive',
+                dag=self.dag)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
+
+            mock_load_file.assert_called_once()
+            d = OrderedDict()
+            d["c0"] = "SMALLINT"
+            d["c1"] = "INT"
+            d["c2"] = "INT"
+            d["c3"] = "BIGINT"
+            d["c4"] = "DECIMAL(38,0)"
+            self.assertEqual(mock_load_file.call_args[1]["field_dict"], d)
+        finally:
+            with m.get_conn() as c:
+                c.execute("DROP TABLE IF EXISTS {}".format(mysql_table))
+
+    @unittest.skipIf(six.PY2, "Skip since HiveServer2Hook doesn't work "
+                              "on Python2 for now. See AIRFLOW-2514.")
+    def test_mysql_to_hive_verify_loaded_values(self):
+        mysql_conn_id = 'airflow_ci'
+        mysql_table = 'test_mysql_to_hive'
+        hive_table = 'test_mysql_to_hive'
+
+        from airflow.hooks.mysql_hook import MySqlHook
+        m = MySqlHook(mysql_conn_id)
+
+        try:
+            minmax = (
+                255,
+                65535,
+                16777215,
+                4294967295,
+                18446744073709551615,
+                -128,
+                -32768,
+                -8388608,
+                -2147483648,
+                -9223372036854775808
+            )
+
+            with m.get_conn() as c:
+                c.execute("DROP TABLE IF EXISTS {}".format(mysql_table))
+                c.execute("""
+                    CREATE TABLE {} (
+                        c0 TINYINT   UNSIGNED,
+                        c1 SMALLINT  UNSIGNED,
+                        c2 MEDIUMINT UNSIGNED,
+                        c3 INT       UNSIGNED,
+                        c4 BIGINT    UNSIGNED,
+                        c5 TINYINT,
+                        c6 SMALLINT,
+                        c7 MEDIUMINT,
+                        c8 INT,
+                        c9 BIGINT
+                    )
+                """.format(mysql_table))
+                c.execute("""
+                    INSERT INTO {} VALUES (
+                        {}, {}, {}, {}, {}, {}, {}, {}, {}, {}
+                    )
+                """.format(mysql_table, *minmax))
+
+            from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
+            t = MySqlToHiveTransfer(
+                task_id='test_m2h',
+                mysql_conn_id=mysql_conn_id,
+                hive_cli_conn_id='beeline_default',
+                sql="SELECT * FROM {}".format(mysql_table),
+                hive_table=hive_table,
+                recreate=True,
+                delimiter=",",
+                dag=self.dag)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
+
+            from airflow.hooks.hive_hooks import HiveServer2Hook
+            h = HiveServer2Hook()
+            r = h.get_records("SELECT * FROM {}".format(hive_table))
+            self.assertEqual(r[0], minmax)
+        finally:
+            with m.get_conn() as c:
+                c.execute("DROP TABLE IF EXISTS {}".format(mysql_table))

Reply via email to