Repository: incubator-airflow Updated Branches: refs/heads/master f7c33afea -> 74027c9a6
[AIRFLOW-2441] Fix bugs in HiveCliHook.load_df This PR fixes HiveCliHook.load_df to: 1. encode delimiter with the specified encoding before passing it to pandas.DataFrame.to_csv so as not to fail 2. flush output file by pandas.DataFrame.to_csv before executing LOAD DATA statement 3. remove header and row index from output file by pandas.DataFrame.to_csv so as to read it as expected via Hive Closes #3334 from sekikn/AIRFLOW-2441 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/74027c9a Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/74027c9a Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/74027c9a Branch: refs/heads/master Commit: 74027c9a6ba5f54a7b6392f6dd79d5b8a8782d7b Parents: f7c33af Author: Kengo Seki <[email protected]> Authored: Thu May 10 10:34:58 2018 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Thu May 10 10:34:58 2018 +0200 ---------------------------------------------------------------------- airflow/hooks/hive_hooks.py | 7 ++++++- tests/hooks/test_hive_hook.py | 27 +++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/74027c9a/airflow/hooks/hive_hooks.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py index 0b7b056..6c9a907 100644 --- a/airflow/hooks/hive_hooks.py +++ b/airflow/hooks/hive_hooks.py @@ -336,7 +336,12 @@ class HiveCliHook(BaseHook): if field_dict is None and (create or recreate): field_dict = _infer_field_types_from_df(df) - df.to_csv(f, sep=delimiter, **pandas_kwargs) + df.to_csv(path_or_buf=f, + sep=delimiter.encode(encoding), + header=False, + index=False, + **pandas_kwargs) + f.flush() return self.load_file(filepath=f.name, table=table, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/74027c9a/tests/hooks/test_hive_hook.py ---------------------------------------------------------------------- diff --git a/tests/hooks/test_hive_hook.py b/tests/hooks/test_hive_hook.py index e16bb38..9eda9e3 100644 --- a/tests/hooks/test_hive_hook.py +++ b/tests/hooks/test_hive_hook.py @@ -19,6 +19,7 @@ # import datetime +import pandas as pd import random import mock @@ -105,6 +106,32 @@ class TestHiveCliHook(unittest.TestCase): ) mock_run_cli.assert_called_with(query) + @mock.patch('airflow.hooks.hive_hooks.HiveCliHook.load_file') + @mock.patch('pandas.DataFrame.to_csv') + def test_load_df(self, mock_to_csv, mock_load_file): + df = pd.DataFrame({"c": ["foo", "bar", "baz"]}) + table = "t" + delimiter = "," + encoding = "utf-8" + + hook = HiveCliHook() + hook.load_df(df=df, + table=table, + delimiter=delimiter, + encoding=encoding) + + mock_to_csv.assert_called_once() + kwargs = mock_to_csv.call_args[1] + self.assertEqual(kwargs["header"], False) + self.assertEqual(kwargs["index"], False) + self.assertEqual(kwargs["sep"], delimiter.encode(encoding)) + + mock_load_file.assert_called_once() + kwargs = mock_load_file.call_args[1] + self.assertEqual(kwargs["delimiter"], delimiter) + self.assertEqual(kwargs["field_dict"], {"c": u"STRING"}) + self.assertEqual(kwargs["table"], table) + class TestHiveMetastoreHook(HiveEnvironmentTest): VALID_FILTER_MAP = {'key2': 'value2'}
