Repository: incubator-airflow Updated Branches: refs/heads/master 29dbedfd0 -> b7dc31510
[AIRFLOW-2500] Fix MySqlToHiveTransfer to transfer unsigned type properly MySQL supports unsigned data types, but Hive doesn't. So if MySqlToHiveTransfer maps MySQL's data types to Hive's corresponding ones directly (e.g. INT -> INT), unsigned values over signed type's upper bound transferred from MySQL are interpreted as invalid by Hive, and users get NULL. To avoid it, this PR fixes MySqlToHiveTransfer to map MySQL data types to Hive's wider ones (e.g. SMALLINT -> INT, INT -> BIGINT, etc.). Closes #3446 from sekikn/AIRFLOW-2500 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b7dc3151 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b7dc3151 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b7dc3151 Branch: refs/heads/master Commit: b7dc315101a0fc924f76e5c5f500c96e85bdd672 Parents: 29dbedf Author: Kengo Seki <[email protected]> Authored: Sun Jun 3 15:24:19 2018 +0200 Committer: Fokko Driesprong <[email protected]> Committed: Sun Jun 3 15:24:19 2018 +0200 ---------------------------------------------------------------------- airflow/operators/mysql_to_hive.py | 5 +- tests/operators/operators.py | 112 ++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b7dc3151/airflow/operators/mysql_to_hive.py ---------------------------------------------------------------------- diff --git a/airflow/operators/mysql_to_hive.py b/airflow/operators/mysql_to_hive.py index 22b7ac2..ab7b7fa 100644 --- a/airflow/operators/mysql_to_hive.py +++ b/airflow/operators/mysql_to_hive.py @@ -104,9 +104,10 @@ class MySqlToHiveTransfer(BaseOperator): t.DOUBLE: 'DOUBLE', t.FLOAT: 'DOUBLE', t.INT24: 'INT', - t.LONG: 'INT', - t.LONGLONG: 'BIGINT', + t.LONG: 'BIGINT', + t.LONGLONG: 'DECIMAL(38,0)', t.SHORT: 'INT', + t.TINY: 'SMALLINT', t.YEAR: 'INT', } return d[mysql_type] if mysql_type in d else 'STRING' http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b7dc3151/tests/operators/operators.py ---------------------------------------------------------------------- diff --git a/tests/operators/operators.py b/tests/operators/operators.py index 4eba0a8..07b1396 100644 --- a/tests/operators/operators.py +++ b/tests/operators/operators.py @@ -23,8 +23,11 @@ from airflow import DAG, configuration, operators from airflow.utils.tests import skipUnlessImported from airflow.utils import timezone +from collections import OrderedDict + import os import mock +import six import unittest configuration.load_test_config() @@ -313,3 +316,112 @@ class TransferTests(unittest.TestCase): tblproperties={'test_property':'test_value'}, dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + @mock.patch('airflow.hooks.hive_hooks.HiveCliHook.load_file') + def test_mysql_to_hive_type_conversion(self, mock_load_file): + mysql_conn_id = 'airflow_ci' + mysql_table = 'test_mysql_to_hive' + + from airflow.hooks.mysql_hook import MySqlHook + m = MySqlHook(mysql_conn_id) + + try: + with m.get_conn() as c: + c.execute("DROP TABLE IF EXISTS {}".format(mysql_table)) + c.execute(""" + CREATE TABLE {} ( + c0 TINYINT, + c1 SMALLINT, + c2 MEDIUMINT, + c3 INT, + c4 BIGINT + ) + """.format(mysql_table)) + + from airflow.operators.mysql_to_hive import MySqlToHiveTransfer + t = MySqlToHiveTransfer( + task_id='test_m2h', + mysql_conn_id=mysql_conn_id, + hive_cli_conn_id='beeline_default', + sql="SELECT * FROM {}".format(mysql_table), + hive_table='test_mysql_to_hive', + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + mock_load_file.assert_called_once() + d = OrderedDict() + d["c0"] = "SMALLINT" + d["c1"] = "INT" + d["c2"] = "INT" + d["c3"] = "BIGINT" + d["c4"] = "DECIMAL(38,0)" + self.assertEqual(mock_load_file.call_args[1]["field_dict"], d) + finally: + with m.get_conn() as c: + c.execute("DROP TABLE IF EXISTS {}".format(mysql_table)) + + @unittest.skipIf(six.PY2, "Skip since HiveServer2Hook doesn't work " + "on Python2 for now. See AIRFLOW-2514.") + def test_mysql_to_hive_verify_loaded_values(self): + mysql_conn_id = 'airflow_ci' + mysql_table = 'test_mysql_to_hive' + hive_table = 'test_mysql_to_hive' + + from airflow.hooks.mysql_hook import MySqlHook + m = MySqlHook(mysql_conn_id) + + try: + minmax = ( + 255, + 65535, + 16777215, + 4294967295, + 18446744073709551615, + -128, + -32768, + -8388608, + -2147483648, + -9223372036854775808 + ) + + with m.get_conn() as c: + c.execute("DROP TABLE IF EXISTS {}".format(mysql_table)) + c.execute(""" + CREATE TABLE {} ( + c0 TINYINT UNSIGNED, + c1 SMALLINT UNSIGNED, + c2 MEDIUMINT UNSIGNED, + c3 INT UNSIGNED, + c4 BIGINT UNSIGNED, + c5 TINYINT, + c6 SMALLINT, + c7 MEDIUMINT, + c8 INT, + c9 BIGINT + ) + """.format(mysql_table)) + c.execute(""" + INSERT INTO {} VALUES ( + {}, {}, {}, {}, {}, {}, {}, {}, {}, {} + ) + """.format(mysql_table, *minmax)) + + from airflow.operators.mysql_to_hive import MySqlToHiveTransfer + t = MySqlToHiveTransfer( + task_id='test_m2h', + mysql_conn_id=mysql_conn_id, + hive_cli_conn_id='beeline_default', + sql="SELECT * FROM {}".format(mysql_table), + hive_table=hive_table, + recreate=True, + delimiter=",", + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + from airflow.hooks.hive_hooks import HiveServer2Hook + h = HiveServer2Hook() + r = h.get_records("SELECT * FROM {}".format(hive_table)) + self.assertEqual(r[0], minmax) + finally: + with m.get_conn() as c: + c.execute("DROP TABLE IF EXISTS {}".format(mysql_table))
