Repository: incubator-airflow Updated Branches: refs/heads/master e48b8e36a -> 67b351183
[AIRFLOW-2448] Enhance HiveCliHook.load_df to work with datetime HiveCliHook.load_df can not handle DataFrame which contains datetime for now. This PR enhances it to work with datetime, fixes some bug introduced by AIRFLOW-2441, and addresses some flake8 issues. Closes #3364 from sekikn/AIRFLOW-2448 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/67b35118 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/67b35118 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/67b35118 Branch: refs/heads/master Commit: 67b351183b0f85e9484f1f7f70e0b46300753b60 Parents: e48b8e3 Author: Kengo Seki <sek...@apache.org> Authored: Sat May 19 16:59:44 2018 +0100 Committer: Kaxil Naik <kaxiln...@apache.org> Committed: Sat May 19 17:00:47 2018 +0100 ---------------------------------------------------------------------- airflow/hooks/hive_hooks.py | 30 +++++++++++++++---------- tests/hooks/test_hive_hook.py | 45 +++++++++++++++++++++++++++++++++++++- 2 files changed, 62 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67b35118/airflow/hooks/hive_hooks.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py index 92958c0..01a6a2a 100644 --- a/airflow/hooks/hive_hooks.py +++ b/airflow/hooks/hive_hooks.py @@ -20,11 +20,12 @@ from __future__ import print_function, unicode_literals from six.moves import zip -from past.builtins import basestring +from past.builtins import basestring, unicode import unicodecsv as csv import itertools import re +import six import subprocess import time from collections import OrderedDict @@ -316,15 +317,16 @@ class HiveCliHook(BaseHook): def _infer_field_types_from_df(df): DTYPE_KIND_HIVE_TYPE = { - 'b': 'BOOLEAN', # boolean - 'i': 'BIGINT', # signed integer - 'u': 'BIGINT', # unsigned integer - 'f': 'DOUBLE', # floating-point - 'c': 'STRING', # complex floating-point - 'O': 'STRING', # object - 'S': 'STRING', # (byte-)string - 'U': 'STRING', # Unicode - 'V': 'STRING' # void + 'b': 'BOOLEAN', # boolean + 'i': 'BIGINT', # signed integer + 'u': 'BIGINT', # unsigned integer + 'f': 'DOUBLE', # floating-point + 'c': 'STRING', # complex floating-point + 'M': 'TIMESTAMP', # datetime + 'O': 'STRING', # object + 'S': 'STRING', # (byte-)string + 'U': 'STRING', # Unicode + 'V': 'STRING' # void } d = OrderedDict() @@ -336,15 +338,19 @@ class HiveCliHook(BaseHook): pandas_kwargs = {} with TemporaryDirectory(prefix='airflow_hiveop_') as tmp_dir: - with NamedTemporaryFile(dir=tmp_dir) as f: + with NamedTemporaryFile(dir=tmp_dir, mode="w") as f: if field_dict is None and (create or recreate): field_dict = _infer_field_types_from_df(df) df.to_csv(path_or_buf=f, - sep=delimiter.encode(encoding), + sep=(delimiter.encode(encoding) + if six.PY2 and isinstance(delimiter, unicode) + else delimiter), header=False, index=False, + encoding=encoding, + date_format="%Y-%m-%d %H:%M:%S", **pandas_kwargs) f.flush() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67b35118/tests/hooks/test_hive_hook.py ---------------------------------------------------------------------- diff --git a/tests/hooks/test_hive_hook.py b/tests/hooks/test_hive_hook.py index 3c278e3..eadb0f1 100644 --- a/tests/hooks/test_hive_hook.py +++ b/tests/hooks/test_hive_hook.py @@ -21,6 +21,7 @@ import datetime import pandas as pd import random +import re import mock import unittest @@ -125,7 +126,7 @@ class TestHiveCliHook(unittest.TestCase): kwargs = mock_to_csv.call_args[1] self.assertEqual(kwargs["header"], False) self.assertEqual(kwargs["index"], False) - self.assertEqual(kwargs["sep"], delimiter.encode(encoding)) + self.assertEqual(kwargs["sep"], delimiter) mock_load_file.assert_called_once() kwargs = mock_load_file.call_args[1] @@ -134,6 +135,48 @@ class TestHiveCliHook(unittest.TestCase): self.assertTrue(isinstance(kwargs["field_dict"], OrderedDict)) self.assertEqual(kwargs["table"], table) + @mock.patch('airflow.hooks.hive_hooks.HiveCliHook.run_cli') + def test_load_df_with_data_types(self, mock_run_cli): + d = OrderedDict() + d['b'] = [True] + d['i'] = [-1] + d['t'] = [1] + d['f'] = [0.0] + d['c'] = ['c'] + d['M'] = [datetime.datetime(2018, 1, 1)] + d['O'] = [object()] + d['S'] = ['STRING'.encode('utf-8')] + d['U'] = ['STRING'] + d['V'] = [None] + df = pd.DataFrame(d) + + hook = HiveCliHook() + hook.load_df(df, 't') + + query = """ + CREATE TABLE IF NOT EXISTS t ( + b BOOLEAN, + i BIGINT, + t BIGINT, + f DOUBLE, + c STRING, + M TIMESTAMP, + O STRING, + S STRING, + U STRING, + V STRING) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + STORED AS textfile + ; + """ + + def _trim(s): + return re.sub("\s+", " ", s.strip()) + + self.assertEqual(_trim(mock_run_cli.call_args_list[0][0][0]), + _trim(query)) + class TestHiveMetastoreHook(HiveEnvironmentTest): VALID_FILTER_MAP = {'key2': 'value2'}