Repository: incubator-airflow Updated Branches: refs/heads/master 0fc45045a -> b532d8d77
[AIRFLOW-1300] Enable table creation with TBLPROPERTIES Enable TBLPROPERTIES parameter in load_df and load_file methods of HiveCliHook and TransferHive operators Closes #2364 from krishnabhupatiraju/tblproperties_hiveclihook Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b532d8d7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b532d8d7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b532d8d7 Branch: refs/heads/master Commit: b532d8d7742e0a1141732654c2796bfc6dc6cabc Parents: 0fc4504 Author: Krishna Bhupatiraju <[email protected]> Authored: Mon Jul 10 12:14:19 2017 -0700 Committer: Alex Guziel <[email protected]> Committed: Mon Jul 10 12:14:19 2017 -0700 ---------------------------------------------------------------------- airflow/hooks/hive_hooks.py | 29 ++++++++++++++++++++------- airflow/operators/mssql_to_hive.py | 7 ++++++- airflow/operators/mysql_to_hive.py | 7 ++++++- airflow/operators/s3_to_hive_operator.py | 10 +++++++-- tests/operators/operators.py | 16 +++++++++++++++ 5 files changed, 58 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b532d8d7/airflow/hooks/hive_hooks.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py index 3e7d2db..d120769 100644 --- a/airflow/hooks/hive_hooks.py +++ b/airflow/hooks/hive_hooks.py @@ -343,7 +343,8 @@ class HiveCliHook(BaseHook): create=True, overwrite=True, partition=None, - recreate=False): + recreate=False, + tblproperties=None): """ Loads a local file into Hive @@ -354,19 +355,28 @@ class HiveCliHook(BaseHook): stage the data into a temporary table before loading it into its final destination using a ``HiveOperator``. + :param filepath: local filepath of the file to load + :type filepath: str :param table: target Hive table, use dot notation to target a specific database :type table: str + :param delimiter: field delimiter in the file + :type delimiter: str + :param field_dict: A dictionary of the fields name in the file + as keys and their Hive types as values + :type field_dict: dict :param create: whether to create the table if it doesn't exist :type create: bool - :param recreate: whether to drop and recreate the table at every - execution - :type recreate: bool + :param overwrite: whether to overwrite the data in table or partition + :type overwrite: bool :param partition: target partition as a dict of partition columns and values :type partition: dict - :param delimiter: field delimiter in the file - :type delimiter: str + :param recreate: whether to drop and recreate the table at every + execution + :type recreate: bool + :param tblproperties: TBLPROPERTIES of the hive table being created + :type tblproperties: dict """ hql = '' if recreate: @@ -383,7 +393,12 @@ class HiveCliHook(BaseHook): hql += "PARTITIONED BY ({pfields})\n" hql += "ROW FORMAT DELIMITED\n" hql += "FIELDS TERMINATED BY '{delimiter}'\n" - hql += "STORED AS textfile;" + hql += "STORED AS textfile\n" + if tblproperties is not None: + tprops = ", ".join( + ["'{0}'='{1}'".format(k, v) for k, v in tblproperties.items()]) + hql += "TBLPROPERTIES({tprops})\n" + hql += ";" hql = hql.format(**locals()) logging.info(hql) self.run_cli(hql) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b532d8d7/airflow/operators/mssql_to_hive.py ---------------------------------------------------------------------- diff --git a/airflow/operators/mssql_to_hive.py b/airflow/operators/mssql_to_hive.py index 6d7521e..a0a2e10 100644 --- a/airflow/operators/mssql_to_hive.py +++ b/airflow/operators/mssql_to_hive.py @@ -57,6 +57,8 @@ class MsSqlToHiveTransfer(BaseOperator): :type mssql_conn_id: str :param hive_conn_id: destination hive connection :type hive_conn_id: str + :param tblproperties: TBLPROPERTIES of the hive table being created + :type tblproperties: dict """ template_fields = ('sql', 'partition', 'hive_table') @@ -74,6 +76,7 @@ class MsSqlToHiveTransfer(BaseOperator): delimiter=chr(1), mssql_conn_id='mssql_default', hive_cli_conn_id='hive_cli_default', + tblproperties=None, *args, **kwargs): super(MsSqlToHiveTransfer, self).__init__(*args, **kwargs) self.sql = sql @@ -85,6 +88,7 @@ class MsSqlToHiveTransfer(BaseOperator): self.mssql_conn_id = mssql_conn_id self.hive_cli_conn_id = hive_cli_conn_id self.partition = partition or {} + self.tblproperties = tblproperties @classmethod def type_map(cls, mssql_type): @@ -124,4 +128,5 @@ class MsSqlToHiveTransfer(BaseOperator): create=self.create, partition=self.partition, delimiter=self.delimiter, - recreate=self.recreate) + recreate=self.recreate, + tblproperties=self.tblproperties) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b532d8d7/airflow/operators/mysql_to_hive.py ---------------------------------------------------------------------- diff --git a/airflow/operators/mysql_to_hive.py b/airflow/operators/mysql_to_hive.py index 2fa2541..ad3ecae 100644 --- a/airflow/operators/mysql_to_hive.py +++ b/airflow/operators/mysql_to_hive.py @@ -58,6 +58,8 @@ class MySqlToHiveTransfer(BaseOperator): :type mysql_conn_id: str :param hive_conn_id: destination hive connection :type hive_conn_id: str + :param tblproperties: TBLPROPERTIES of the hive table being created + :type tblproperties: dict """ template_fields = ('sql', 'partition', 'hive_table') @@ -75,6 +77,7 @@ class MySqlToHiveTransfer(BaseOperator): delimiter=chr(1), mysql_conn_id='mysql_default', hive_cli_conn_id='hive_cli_default', + tblproperties=None, *args, **kwargs): super(MySqlToHiveTransfer, self).__init__(*args, **kwargs) self.sql = sql @@ -86,6 +89,7 @@ class MySqlToHiveTransfer(BaseOperator): self.mysql_conn_id = mysql_conn_id self.hive_cli_conn_id = hive_cli_conn_id self.partition = partition or {} + self.tblproperties = tblproperties @classmethod def type_map(cls, mysql_type): @@ -128,4 +132,5 @@ class MySqlToHiveTransfer(BaseOperator): create=self.create, partition=self.partition, delimiter=self.delimiter, - recreate=self.recreate) + recreate=self.recreate, + tblproperties=self.tblproperties) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b532d8d7/airflow/operators/s3_to_hive_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py index 92340f8..7ae0616 100644 --- a/airflow/operators/s3_to_hive_operator.py +++ b/airflow/operators/s3_to_hive_operator.py @@ -78,6 +78,8 @@ class S3ToHiveTransfer(BaseOperator): :param input_compressed: Boolean to determine if file decompression is required to process headers :type input_compressed: bool + :param tblproperties: TBLPROPERTIES of the hive table being created + :type tblproperties: dict """ template_fields = ('s3_key', 'partition', 'hive_table') @@ -100,6 +102,7 @@ class S3ToHiveTransfer(BaseOperator): s3_conn_id='s3_default', hive_cli_conn_id='hive_cli_default', input_compressed=False, + tblproperties=None, *args, **kwargs): super(S3ToHiveTransfer, self).__init__(*args, **kwargs) self.s3_key = s3_key @@ -115,6 +118,7 @@ class S3ToHiveTransfer(BaseOperator): self.hive_cli_conn_id = hive_cli_conn_id self.s3_conn_id = s3_conn_id self.input_compressed = input_compressed + self.tblproperties = tblproperties if (self.check_headers and not (self.field_dict is not None and self.headers)): @@ -156,7 +160,8 @@ class S3ToHiveTransfer(BaseOperator): create=self.create, partition=self.partition, delimiter=self.delimiter, - recreate=self.recreate) + recreate=self.recreate, + tblproperties=self.tblproperties) else: # Decompressing file if self.input_compressed: @@ -193,7 +198,8 @@ class S3ToHiveTransfer(BaseOperator): create=self.create, partition=self.partition, delimiter=self.delimiter, - recreate=self.recreate) + recreate=self.recreate, + tblproperties=self.tblproperties) def _get_top_row_as_list(self, file_name): with open(file_name, 'rt') as f: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b532d8d7/tests/operators/operators.py ---------------------------------------------------------------------- diff --git a/tests/operators/operators.py b/tests/operators/operators.py index 62bc4bf..0f5abd5 100644 --- a/tests/operators/operators.py +++ b/tests/operators/operators.py @@ -283,3 +283,19 @@ class TransferTests(unittest.TestCase): delimiter=",", dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + def test_mysql_to_hive_tblproperties(self): + # import airflow.operators + from airflow.operators.mysql_to_hive import MySqlToHiveTransfer + sql = "SELECT * FROM baby_names LIMIT 1000;" + t = MySqlToHiveTransfer( + task_id='test_m2h', + mysql_conn_id='airflow_ci', + hive_cli_conn_id='beeline_default', + sql=sql, + hive_table='test_mysql_to_hive', + recreate=True, + delimiter=",", + tblproperties={'test_property':'test_value'}, + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
