Repository: incubator-airflow Updated Branches: refs/heads/master 19ed9001b -> 43bf89da7
[AIRFLOW-731] Fix period bug for NamedHivePartitionSensor Fix a bug in partition name parsing for the operator. Closes #1973 from artwr/artwr- bugfix_for_named_partition_sensor_and_periods Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/43bf89da Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/43bf89da Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/43bf89da Branch: refs/heads/master Commit: 43bf89da7bf6700fda9fdf3f64032a79e5fa76b4 Parents: 19ed900 Author: Arthur Wiedmer <[email protected]> Authored: Sun Jan 8 14:47:16 2017 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Sun Jan 8 14:47:26 2017 +0100 ---------------------------------------------------------------------- airflow/operators/sensors.py | 5 +- tests/operators/hive_operator.py | 96 ++++++++++++++++++++++------------- 2 files changed, 64 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/43bf89da/airflow/operators/sensors.py ---------------------------------------------------------------------- diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py index da01483..f5dd148 100644 --- a/airflow/operators/sensors.py +++ b/airflow/operators/sensors.py @@ -270,7 +270,7 @@ class NamedHivePartitionSensor(BaseSensorOperator): self, partition_names, metastore_conn_id='metastore_default', - poke_interval=60*3, + poke_interval=60 * 3, *args, **kwargs): super(NamedHivePartitionSensor, self).__init__( @@ -283,9 +283,10 @@ class NamedHivePartitionSensor(BaseSensorOperator): self.partition_names = partition_names self.next_poke_idx = 0 + @classmethod def parse_partition_name(self, partition): try: - schema, table_partition = partition.split('.') + schema, table_partition = partition.split('.', 1) table, partition = table_partition.split('/', 1) return schema, table, partition except ValueError as e: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/43bf89da/tests/operators/hive_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py index 9f90999..fec5e69 100644 --- a/tests/operators/hive_operator.py +++ b/tests/operators/hive_operator.py @@ -21,7 +21,7 @@ import mock import nose import six -from airflow import DAG, configuration, operators, utils +from airflow import DAG, configuration, operators configuration.load_test_config() @@ -67,10 +67,12 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: hook = HiveServer2Hook() hook.to_csv(hql=sql, csv_filepath="/tmp/test_to_csv") - def connect_mock(host, port, auth_mechanism, kerberos_service_name, user, database): + def connect_mock(self, host, port, + auth_mechanism, kerberos_service_name, + user, database): self.assertEqual(database, self.nondefault_schema) - @patch('HiveServer2Hook.connect', return_value="foo") + @mock.patch('HiveServer2Hook.connect', return_value="foo") def test_select_conn_with_schema(self, connect_mock): from airflow.hooks.hive_hooks import HiveServer2Hook @@ -94,15 +96,15 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: schema = "notdefault" hook = HiveServer2Hook() cursor_mock = MagicMock( - __enter__ = cursor_mock, - __exit__ = None, - execute = None, - fetchall = [], + __enter__=cursor_mock, + __exit__=None, + execute=None, + fetchall=[], ) get_conn_mock = MagicMock( - __enter__ = get_conn_mock, - __exit__ = None, - cursor = cursor_mock, + __enter__=get_conn_mock, + __exit__=None, + cursor=cursor_mock, ) hook.get_conn = get_conn_mock @@ -112,7 +114,7 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: # Verify get_conn_mock.assert_called_with(self.nondefault_schema) - @patch('HiveServer2Hook.get_results', return_value={data:[]}) + @mock.patch('HiveServer2Hook.get_results', return_value={'data': []}) def test_get_records_with_schema(self, get_results_mock): from airflow.hooks.hive_hooks import HiveServer2Hook @@ -124,12 +126,12 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: hook.get_records(sql, self.nondefault_schema) # Verify - assert connect_mock.called - (args, kwargs) = connect_mock.call_args_list[0] + assert self.connect_mock.called + (args, kwargs) = self.connect_mock.call_args_list[0] assert args[0] == sql assert kwargs['schema'] == self.nondefault_schema - @patch('HiveServer2Hook.get_results', return_value={data:[]}) + @mock.patch('HiveServer2Hook.get_results', return_value={'data': []}) def test_get_pandas_df_with_schema(self, get_results_mock): from airflow.hooks.hive_hooks import HiveServer2Hook @@ -138,11 +140,11 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: hook = HiveServer2Hook() # Run - hook.get_pandas_df(sql, schema) + hook.get_pandas_df(sql, self.nondefault_schema) # Verify - assert connect_mock.called - (args, kwargs) = connect_mock.call_args_list[0] + assert self.connect_mock.called + (args, kwargs) = self.connect_mock.call_args_list[0] assert args[0] == sql assert kwargs['schema'] == self.nondefault_schema @@ -172,7 +174,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: import airflow.operators.hive_operator t = operators.hive_operator.HiveOperator( task_id='basic_hql', hql=self.hql, dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, + ignore_ti_state=True) def test_hive_queues(self): import airflow.operators.hive_operator @@ -181,8 +184,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: mapred_queue='default', mapred_queue_priority='HIGH', mapred_job_name='airflow.test_hive_queues', dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, + ignore_ti_state=True) def test_hive_dryrun(self): import airflow.operators.hive_operator @@ -195,7 +198,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: t = operators.hive_operator.HiveOperator( task_id='beeline_hql', hive_cli_conn_id='beeline_default', hql=self.hql, dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, + ignore_ti_state=True) def test_presto(self): sql = """ @@ -204,7 +208,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: import airflow.operators.presto_check_operator t = operators.presto_check_operator.PrestoCheckOperator( task_id='presto_check', sql=sql, dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, + ignore_ti_state=True) def test_presto_to_mysql(self): import airflow.operators.presto_to_mysql @@ -218,14 +223,16 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: mysql_table='test_static_babynames', mysql_preoperator='TRUNCATE TABLE test_static_babynames;', dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, + ignore_ti_state=True) def test_hdfs_sensor(self): t = operators.sensors.HdfsSensor( task_id='hdfs_sensor_check', filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames', dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, + ignore_ti_state=True) def test_webhdfs_sensor(self): t = operators.sensors.WebHdfsSensor( @@ -233,7 +240,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames', timeout=120, dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, + ignore_ti_state=True) def test_sql_sensor(self): t = operators.sensors.SqlSensor( @@ -241,7 +249,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: conn_id='presto_default', sql="SELECT 'x' FROM airflow.static_babynames LIMIT 1;", dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, + ignore_ti_state=True) def test_hive_stats(self): import airflow.operators.hive_stats_operator @@ -250,14 +259,18 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: table="airflow.static_babynames_partitioned", partition={'ds': DEFAULT_DATE_DS}, dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, + ignore_ti_state=True) def test_named_hive_partition_sensor(self): t = operators.sensors.NamedHivePartitionSensor( task_id='hive_partition_check', - partition_names=["airflow.static_babynames_partitioned/ds={{ds}}"], + partition_names=[ + "airflow.static_babynames_partitioned/ds={{ds}}" + ], dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, + ignore_ti_state=True) def test_named_hive_partition_sensor_succeeds_on_multiple_partitions(self): t = operators.sensors.NamedHivePartitionSensor( @@ -267,7 +280,15 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: "airflow.static_babynames_partitioned/ds={{ds}}" ], dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, + ignore_ti_state=True) + + def test_named_hive_partition_sensor_parses_partitions_with_periods(self): + t = operators.sensors.NamedHivePartitionSensor.parse_partition_name( + partition="schema.table/part1=this.can.be.an.issue/part2=ok") + self.assertEqual(t[0], "schema") + self.assertEqual(t[1], "table") + self.assertEqual(t[2], "part1=this.can.be.an.issue/part2=this_should_be_ok") @nose.tools.raises(airflow.exceptions.AirflowSensorTimeout) def test_named_hive_partition_sensor_times_out_on_nonexistent_partition(self): @@ -280,14 +301,16 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: poke_interval=0.1, timeout=1, dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, + ignore_ti_state=True) def test_hive_partition_sensor(self): t = operators.sensors.HivePartitionSensor( task_id='hive_partition_check', table='airflow.static_babynames_partitioned', dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, + ignore_ti_state=True) def test_hive_metastore_sql_sensor(self): t = operators.sensors.MetastorePartitionSensor( @@ -295,7 +318,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: table='airflow.static_babynames_partitioned', partition_name='ds={}'.format(DEFAULT_DATE_DS), dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, + ignore_ti_state=True) def test_hive2samba(self): import airflow.operators.hive_to_samba_operator @@ -305,7 +329,8 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: hql="SELECT * FROM airflow.static_babynames LIMIT 10000", destination_filepath='test_airflow.csv', dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, + ignore_ti_state=True) def test_hive_to_mysql(self): import airflow.operators.hive_to_mysql @@ -325,4 +350,5 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: ], dag=self.dag) t.clear(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, + ignore_ti_state=True)
