Repository: incubator-airflow Updated Branches: refs/heads/master 8bad4c943 -> 1db307337
[AIRFLOW-2471] Fix HiveCliHook.load_df to use unused parameters This PR fixes HiveCliHook.load_df to pass load_file the parameter called create and recreate, which are currently ignored, as part of kwargs. Closes #3390 from sekikn/AIRFLOW-2471 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1db30733 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1db30733 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1db30733 Branch: refs/heads/master Commit: 1db3073374b6fd033651caf1fcb98e743483fa30 Parents: 8bad4c9 Author: Kengo Seki <[email protected]> Authored: Mon May 21 19:15:51 2018 +0100 Committer: Kaxil Naik <[email protected]> Committed: Mon May 21 19:15:51 2018 +0100 ---------------------------------------------------------------------- airflow/hooks/hive_hooks.py | 13 +++++-------- tests/hooks/test_hive_hook.py | 18 ++++++++++++++++++ 2 files changed, 23 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1db30733/airflow/hooks/hive_hooks.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py index 01a6a2a..e2e3111 100644 --- a/airflow/hooks/hive_hooks.py +++ b/airflow/hooks/hive_hooks.py @@ -285,8 +285,6 @@ class HiveCliHook(BaseHook): self, df, table, - create=True, - recreate=False, field_dict=None, delimiter=',', encoding='utf8', @@ -297,17 +295,16 @@ class HiveCliHook(BaseHook): Hive data types will be inferred if not passed but column names will not be sanitized. + :param df: DataFrame to load into a Hive table + :type df: DataFrame :param table: target Hive table, use dot notation to target a specific database :type table: str - :param create: whether to create the table if it doesn't exist - :type create: bool - :param recreate: whether to drop and recreate the table at every - execution - :type recreate: bool :param field_dict: mapping from column name to hive data type. Note that it must be OrderedDict so as to keep columns' order. :type field_dict: OrderedDict + :param delimiter: field delimiter in the file + :type delimiter: str :param encoding: string encoding to use when writing DataFrame to file :type encoding: str :param pandas_kwargs: passed to DataFrame.to_csv @@ -340,7 +337,7 @@ class HiveCliHook(BaseHook): with TemporaryDirectory(prefix='airflow_hiveop_') as tmp_dir: with NamedTemporaryFile(dir=tmp_dir, mode="w") as f: - if field_dict is None and (create or recreate): + if field_dict is None: field_dict = _infer_field_types_from_df(df) df.to_csv(path_or_buf=f, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1db30733/tests/hooks/test_hive_hook.py ---------------------------------------------------------------------- diff --git a/tests/hooks/test_hive_hook.py b/tests/hooks/test_hive_hook.py index eadb0f1..d090086 100644 --- a/tests/hooks/test_hive_hook.py +++ b/tests/hooks/test_hive_hook.py @@ -19,6 +19,7 @@ # import datetime +import itertools import pandas as pd import random import re @@ -135,6 +136,23 @@ class TestHiveCliHook(unittest.TestCase): self.assertTrue(isinstance(kwargs["field_dict"], OrderedDict)) self.assertEqual(kwargs["table"], table) + @mock.patch('airflow.hooks.hive_hooks.HiveCliHook.load_file') + @mock.patch('pandas.DataFrame.to_csv') + def test_load_df_with_optional_parameters(self, mock_to_csv, mock_load_file): + hook = HiveCliHook() + b = (True, False) + for create, recreate in itertools.product(b, b): + mock_load_file.reset_mock() + hook.load_df(df=pd.DataFrame({"c": range(0, 10)}), + table="t", + create=create, + recreate=recreate) + + mock_load_file.assert_called_once() + kwargs = mock_load_file.call_args[1] + self.assertEqual(kwargs["create"], create) + self.assertEqual(kwargs["recreate"], recreate) + @mock.patch('airflow.hooks.hive_hooks.HiveCliHook.run_cli') def test_load_df_with_data_types(self, mock_run_cli): d = OrderedDict()
