Repository: incubator-airflow Updated Branches: refs/heads/master 0f9f4605f -> b8c2cea36
[AIRFLOW-2150] Use lighter call in HiveMetastoreHook().max_partition() Call self.metastore.get_partition_names() instead of self.metastore.get_partitions(), which is extremely expensive for large tables, in HiveMetastoreHook().max_partition(). Closes #3082 from yrqls21/kevin_yang_fix_hive_max_partition Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b8c2cea3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b8c2cea3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b8c2cea3 Branch: refs/heads/master Commit: b8c2cea36299d6a3264d8bb1dc5a3995732b8855 Parents: 0f9f460 Author: Kevin Yang <kevin.y...@airbnb.com> Authored: Wed Mar 7 16:12:14 2018 -0800 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Wed Mar 7 16:12:18 2018 -0800 ---------------------------------------------------------------------- airflow/hooks/hive_hooks.py | 64 ++++++++++++++++++++++++++++++-------- airflow/macros/hive.py | 2 +- tests/hooks/test_hive_hook.py | 39 +++++++++++++++++++++++ 3 files changed, 91 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b8c2cea3/airflow/hooks/hive_hooks.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py index cd7319d..128be41 100644 --- a/airflow/hooks/hive_hooks.py +++ b/airflow/hooks/hive_hooks.py @@ -429,9 +429,11 @@ class HiveCliHook(BaseHook): class HiveMetastoreHook(BaseHook): - """ Wrapper to interact with the Hive Metastore""" + # java short max val + MAX_PART_COUNT = 32767 + def __init__(self, metastore_conn_id='metastore_default'): self.metastore_conn = self.get_connection(metastore_conn_id) self.metastore = self.get_metastore_client() @@ -601,16 +603,46 @@ class HiveMetastoreHook(BaseHook): if filter: parts = self.metastore.get_partitions_by_filter( db_name=schema, tbl_name=table_name, - filter=filter, max_parts=32767) + filter=filter, max_parts=HiveMetastoreHook.MAX_PART_COUNT) else: parts = self.metastore.get_partitions( - db_name=schema, tbl_name=table_name, max_parts=32767) + db_name=schema, tbl_name=table_name, + max_parts=HiveMetastoreHook.MAX_PART_COUNT) self.metastore._oprot.trans.close() pnames = [p.name for p in table.partitionKeys] return [dict(zip(pnames, p.values)) for p in parts] - def max_partition(self, schema, table_name, field=None, filter=None): + @staticmethod + def _get_max_partition_from_part_names(part_names, key_name): + """ + Helper method to get max partition from part names. Works only + when partition format follows '{key}={value}' and key_name is name of + the only partition key. + :param part_names: list of partition names + :type part_names: list + :param key_name: partition key name + :type key_name: str + :return: Max partition or None if part_names is empty. + """ + if not part_names: + return None + + prefix = key_name + '=' + prefix_len = len(key_name) + 1 + max_val = None + for part_name in part_names: + if part_name.startswith(prefix): + if max_val is None: + max_val = part_name[prefix_len:] + else: + max_val = max(max_val, part_name[prefix_len:]) + else: + raise AirflowException( + "Partition name mal-formatted: {}".format(part_name)) + return max_val + + def max_partition(self, schema, table_name, field=None): """ Returns the maximum value for all partitions in a table. Works only for tables that have a single partition key. For subpartitioned @@ -621,17 +653,23 @@ class HiveMetastoreHook(BaseHook): >>> hh.max_partition(schema='airflow', table_name=t) '2015-01-01' """ - parts = self.get_partitions(schema, table_name, filter) - if not parts: - return None - elif len(parts[0]) == 1: - field = list(parts[0].keys())[0] - elif not field: + self.metastore._oprot.trans.open() + table = self.metastore.get_table(dbname=schema, tbl_name=table_name) + if len(table.partitionKeys) != 1: raise AirflowException( - "Please specify the field you want the max " - "value for") + "The table isn't partitioned by a single partition key") + + key_name = table.partitionKeys[0].name + if field is not None and key_name != field: + raise AirflowException("Provided field is not the partition key") + + part_names = \ + self.metastore.get_partition_names(schema, + table_name, + max_parts=HiveMetastoreHook.MAX_PART_COUNT) + self.metastore._oprot.trans.close() - return max([p[field] for p in parts]) + return HiveMetastoreHook._get_max_partition_from_part_names(part_names, key_name) def table_exists(self, table_name, db='default'): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b8c2cea3/airflow/macros/hive.py ---------------------------------------------------------------------- diff --git a/airflow/macros/hive.py b/airflow/macros/hive.py index c68c293..ef80fc6 100644 --- a/airflow/macros/hive.py +++ b/airflow/macros/hive.py @@ -44,7 +44,7 @@ def max_partition( schema, table = table.split('.') hh = HiveMetastoreHook(metastore_conn_id=metastore_conn_id) return hh.max_partition( - schema=schema, table_name=table, field=field, filter=filter) + schema=schema, table_name=table, field=field) def _closest_date(target_dt, date_list, before_target=None): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b8c2cea3/tests/hooks/test_hive_hook.py ---------------------------------------------------------------------- diff --git a/tests/hooks/test_hive_hook.py b/tests/hooks/test_hive_hook.py new file mode 100644 index 0000000..c7da8e5 --- /dev/null +++ b/tests/hooks/test_hive_hook.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +from airflow.exceptions import AirflowException +from airflow.hooks.hive_hooks import HiveMetastoreHook + + +class TestHiveMetastoreHook(unittest.TestCase): + def test_get_max_partition_from_empty_part_names(self): + max_partition = \ + HiveMetastoreHook._get_max_partition_from_part_names([], 'some_key') + self.assertIsNone(max_partition) + + def test_get_max_partition_from_mal_formatted_part_names(self): + with self.assertRaises(AirflowException): + HiveMetastoreHook._get_max_partition_from_part_names( + ['bad_partition_name'], 'some_key') + + def test_get_max_partition_from_mal_valid_part_names(self): + max_partition = \ + HiveMetastoreHook._get_max_partition_from_part_names(['some_key=value1', + 'some_key=value2', + 'some_key=value3'], + 'some_key') + self.assertEqual(max_partition, 'value3')