Repository: incubator-airflow Updated Branches: refs/heads/master b56e64224 -> 3ceb3abf1
[AIRFLOW-857] Use library assert statements instead of conditionals Testing Done: - Travis Closes #2062 from saguziel/aguziel-fix-asserts Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3ceb3abf Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3ceb3abf Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3ceb3abf Branch: refs/heads/master Commit: 3ceb3abf166b77ea8baeb9015d33223143cdfb8a Parents: b56e642 Author: Alex Guziel <[email protected]> Authored: Fri Feb 10 11:53:34 2017 -0800 Committer: Dan Davydov <[email protected]> Committed: Fri Feb 10 11:53:37 2017 -0800 ---------------------------------------------------------------------- tests/contrib/hooks/test_jira_hook.py | 2 +- tests/contrib/operators/dataflow_operator.py | 2 +- tests/contrib/operators/jira_operator_test.py | 8 +- tests/contrib/sensors/hdfs_sensors.py | 2 +- tests/contrib/sensors/jira_sensor_test.py | 4 +- tests/core.py | 434 ++++++++++----------- tests/dags/no_dags.py | 2 +- tests/jobs.py | 2 +- tests/models.py | 26 +- tests/operators/hive_operator.py | 16 +- tests/operators/latest_only_operator.py | 12 +- tests/operators/operators.py | 2 +- tests/operators/sensors.py | 2 +- tests/plugins_manager.py | 14 +- tests/www/api/experimental/test_endpoints.py | 8 +- 15 files changed, 268 insertions(+), 268 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/contrib/hooks/test_jira_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_jira_hook.py b/tests/contrib/hooks/test_jira_hook.py index 1a3d735..977944e 100644 --- a/tests/contrib/hooks/test_jira_hook.py +++ b/tests/contrib/hooks/test_jira_hook.py @@ -42,7 +42,7 @@ class TestJiraHook(unittest.TestCase): def test_jira_client_connection(self, jira_mock): jira_hook = JiraHook() - assert jira_mock.called + self.assertTrue(jira_mock.called) self.assertIsInstance(jira_hook.client, Mock) self.assertEqual(jira_hook.client.name, jira_mock.return_value.name) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/contrib/operators/dataflow_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/dataflow_operator.py b/tests/contrib/operators/dataflow_operator.py index 5329723..7455a45 100644 --- a/tests/contrib/operators/dataflow_operator.py +++ b/tests/contrib/operators/dataflow_operator.py @@ -69,7 +69,7 @@ class DataFlowPythonOperatorTest(unittest.TestCase): start_python_hook = dataflow_mock.return_value.start_python_dataflow gcs_download_hook = gcs_hook.return_value.download self.dataflow.execute(None) - assert dataflow_mock.called + self.assertTrue(dataflow_mock.called) expected_options = { 'project': 'test', 'staging_location': 'gs://test/staging', http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/contrib/operators/jira_operator_test.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/jira_operator_test.py b/tests/contrib/operators/jira_operator_test.py index 0188c0b..6d615df 100644 --- a/tests/contrib/operators/jira_operator_test.py +++ b/tests/contrib/operators/jira_operator_test.py @@ -74,8 +74,8 @@ class TestJiraOperator(unittest.TestCase): jira_ticket_search_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - assert jira_mock.called - assert jira_mock.return_value.search_issues.called + self.assertTrue(jira_mock.called) + self.assertTrue(jira_mock.return_value.search_issues.called) @patch("airflow.contrib.hooks.jira_hook.JIRA", autospec=True, return_value=jira_client_mock) @@ -93,8 +93,8 @@ class TestJiraOperator(unittest.TestCase): add_comment_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - assert jira_mock.called - assert jira_mock.return_value.add_comment.called + self.assertTrue(jira_mock.called) + self.assertTrue(jira_mock.return_value.add_comment.called) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/contrib/sensors/hdfs_sensors.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/hdfs_sensors.py b/tests/contrib/sensors/hdfs_sensors.py index cabe349..0e2ed0c 100644 --- a/tests/contrib/sensors/hdfs_sensors.py +++ b/tests/contrib/sensors/hdfs_sensors.py @@ -248,4 +248,4 @@ class HdfsSensorRegexTests(unittest.TestCase): # When # Then with self.assertRaises(AirflowSensorTimeout): - task.execute(None) \ No newline at end of file + task.execute(None) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/contrib/sensors/jira_sensor_test.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/jira_sensor_test.py b/tests/contrib/sensors/jira_sensor_test.py index 5ca58e4..77ca97f 100644 --- a/tests/contrib/sensors/jira_sensor_test.py +++ b/tests/contrib/sensors/jira_sensor_test.py @@ -73,8 +73,8 @@ class TestJiraSensor(unittest.TestCase): ticket_label_sensor.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - assert jira_mock.called - assert jira_mock.return_value.issue.called + self.assertTrue(jira_mock.called) + self.assertTrue(jira_mock.return_value.issue.called) @staticmethod def field_checker_func(context, issue): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index 0bf4052..fba05f7 100644 --- a/tests/core.py +++ b/tests/core.py @@ -134,15 +134,15 @@ class CoreTest(unittest.TestCase): start_date=datetime(2015, 1, 2, 0, 0))) dag_run = jobs.SchedulerJob(**self.default_scheduler_args).create_dag_run(dag) - assert dag_run is not None - assert dag_run.dag_id == dag.dag_id - assert dag_run.run_id is not None - assert dag_run.run_id != '' - assert dag_run.execution_date == datetime(2015, 1, 2, 0, 0), ( + self.assertIsNotNone(dag_run) + self.assertEqual(dag.dag_id, dag_run.dag_id) + self.assertIsNotNone(dag_run.run_id) + self.assertNotEqual('', dag_run.run_id) + self.assertEqual(datetime(2015, 1, 2, 0, 0), dag_run.execution_date, msg= 'dag_run.execution_date did not match expectation: {0}' .format(dag_run.execution_date)) - assert dag_run.state == State.RUNNING - assert dag_run.external_trigger == False + self.assertEqual(State.RUNNING, dag_run.state) + self.assertFalse(dag_run.external_trigger) dag.clear() def test_schedule_dag_fake_scheduled_previous(self): @@ -165,15 +165,15 @@ class CoreTest(unittest.TestCase): state=State.SUCCESS, external_trigger=True) dag_run = scheduler.create_dag_run(dag) - assert dag_run is not None - assert dag_run.dag_id == dag.dag_id - assert dag_run.run_id is not None - assert dag_run.run_id != '' - assert dag_run.execution_date == DEFAULT_DATE + delta, ( + self.assertIsNotNone(dag_run) + self.assertEqual(dag.dag_id, dag_run.dag_id) + self.assertIsNotNone(dag_run.run_id) + self.assertNotEqual('', dag_run.run_id) + self.assertEqual(DEFAULT_DATE + delta, dag_run.execution_date, msg= 'dag_run.execution_date did not match expectation: {0}' .format(dag_run.execution_date)) - assert dag_run.state == State.RUNNING - assert dag_run.external_trigger == False + self.assertEqual(State.RUNNING, dag_run.state) + self.assertFalse(dag_run.external_trigger) def test_schedule_dag_once(self): """ @@ -189,8 +189,8 @@ class CoreTest(unittest.TestCase): dag_run = jobs.SchedulerJob(**self.default_scheduler_args).create_dag_run(dag) dag_run2 = jobs.SchedulerJob(**self.default_scheduler_args).create_dag_run(dag) - assert dag_run is not None - assert dag_run2 is None + self.assertIsNotNone(dag_run) + self.assertIsNone(dag_run2) dag.clear() def test_fractional_seconds(self): @@ -246,9 +246,9 @@ class CoreTest(unittest.TestCase): additional_dag_run = scheduler.create_dag_run(dag) for dag_run in dag_runs: - assert dag_run is not None + self.assertIsNotNone(dag_run) - assert additional_dag_run is None + self.assertIsNone(additional_dag_run) @mock.patch('airflow.jobs.datetime', FakeDatetime) def test_schedule_dag_no_end_date_up_to_today_only(self): @@ -288,16 +288,16 @@ class CoreTest(unittest.TestCase): additional_dag_run = scheduler.create_dag_run(dag) for dag_run in dag_runs: - assert dag_run is not None + self.assertIsNotNone(dag_run) - assert additional_dag_run is None + self.assertIsNone(additional_dag_run) def test_confirm_unittest_mod(self): - assert configuration.get('core', 'unit_test_mode') + self.assertTrue(configuration.get('core', 'unit_test_mode')) def test_pickling(self): dp = self.dag.pickle() - assert self.dag.dag_id == dp.pickle.dag_id + self.assertEqual(dp.pickle.dag_id, self.dag.dag_id) def test_rich_comparison_ops(self): @@ -317,32 +317,32 @@ class CoreTest(unittest.TestCase): d.last_loaded = self.dag.last_loaded # test identity equality - assert self.dag == self.dag + self.assertEqual(self.dag, self.dag) # test dag (in)equality based on _comps - assert self.dag == dag_eq - assert self.dag != dag_diff_name - assert self.dag != dag_diff_load_time + self.assertEqual(dag_eq, self.dag) + self.assertNotEqual(dag_diff_name, self.dag) + self.assertNotEqual(dag_diff_load_time, self.dag) # test dag inequality based on type even if _comps happen to match - assert self.dag != dag_subclass + self.assertNotEqual(dag_subclass, self.dag) # a dag should equal an unpickled version of itself - assert self.dag == pickle.loads(pickle.dumps(self.dag)) + self.assertEqual(pickle.loads(pickle.dumps(self.dag)), self.dag) # dags are ordered based on dag_id no matter what the type is - assert self.dag < dag_diff_name - assert not self.dag < dag_diff_load_time - assert self.dag < dag_subclass_diff_name + self.assertLess(self.dag, dag_diff_name) + self.assertGreater(self.dag, dag_diff_load_time) + self.assertLess(self.dag, dag_subclass_diff_name) # greater than should have been created automatically by functools - assert dag_diff_name > self.dag + self.assertGreater(dag_diff_name, self.dag) # hashes are non-random and match equality - assert hash(self.dag) == hash(self.dag) - assert hash(self.dag) == hash(dag_eq) - assert hash(self.dag) != hash(dag_diff_name) - assert hash(self.dag) != hash(dag_subclass) + self.assertEqual(hash(self.dag), hash(self.dag)) + self.assertEqual(hash(dag_eq), hash(self.dag)) + self.assertNotEqual(hash(dag_diff_name), hash(self.dag)) + self.assertNotEqual(hash(dag_subclass), hash(self.dag)) def test_time_sensor(self): t = sensors.TimeSensor( @@ -588,7 +588,7 @@ class CoreTest(unittest.TestCase): on_success_callback=verify_templated_field, dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - assert val['success'] + self.assertTrue(val['success']) def test_template_with_json_variable(self): """ @@ -611,7 +611,7 @@ class CoreTest(unittest.TestCase): on_success_callback=verify_templated_field, dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - assert val['success'] + self.assertTrue(val['success']) def test_template_with_json_variable_as_value(self): """ @@ -635,7 +635,7 @@ class CoreTest(unittest.TestCase): on_success_callback=verify_templated_field, dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - assert val['success'] + self.assertTrue(val['success']) def test_template_non_bool(self): """ @@ -673,7 +673,7 @@ class CoreTest(unittest.TestCase): "child_process_log_directory") latest_log_directory_path = os.path.join(log_base_directory, "latest") # verify that the symlink to the latest logs exists - assert os.path.islink(latest_log_directory_path) + self.assertTrue(os.path.islink(latest_log_directory_path)) # verify that the symlink points to the correct log directory log_directory = os.path.join(log_base_directory, "2016-01-01") @@ -695,53 +695,53 @@ class CoreTest(unittest.TestCase): def test_variable_set_get_round_trip(self): Variable.set("tested_var_set_id", "Monday morning breakfast") - assert "Monday morning breakfast" == Variable.get("tested_var_set_id") + self.assertEqual("Monday morning breakfast", Variable.get("tested_var_set_id")) def test_variable_set_get_round_trip_json(self): value = {"a": 17, "b": 47} Variable.set("tested_var_set_id", value, serialize_json=True) - assert value == Variable.get("tested_var_set_id", deserialize_json=True) + self.assertEqual(value, Variable.get("tested_var_set_id", deserialize_json=True)) def test_get_non_existing_var_should_return_default(self): default_value = "some default val" - assert default_value == Variable.get("thisIdDoesNotExist", - default_var=default_value) + self.assertEqual(default_value, Variable.get("thisIdDoesNotExist", + default_var=default_value)) def test_get_non_existing_var_should_not_deserialize_json_default(self): default_value = "}{ this is a non JSON default }{" - assert default_value == Variable.get("thisIdDoesNotExist", + self.assertEqual(default_value, Variable.get("thisIdDoesNotExist", default_var=default_value, - deserialize_json=True) + deserialize_json=True)) def test_variable_setdefault_round_trip(self): key = "tested_var_setdefault_1_id" value = "Monday morning breakfast in Paris" Variable.setdefault(key, value) - assert value == Variable.get(key) + self.assertEqual(value, Variable.get(key)) def test_variable_setdefault_round_trip_json(self): key = "tested_var_setdefault_2_id" value = {"city": 'Paris', "Hapiness": True} Variable.setdefault(key, value, deserialize_json=True) - assert value == Variable.get(key, deserialize_json=True) + self.assertEqual(value, Variable.get(key, deserialize_json=True)) def test_parameterized_config_gen(self): cfg = configuration.parameterized_config(configuration.DEFAULT_CONFIG) # making sure some basic building blocks are present: - assert "[core]" in cfg - assert "dags_folder" in cfg - assert "sql_alchemy_conn" in cfg - assert "fernet_key" in cfg + self.assertIn("[core]", cfg) + self.assertIn("dags_folder", cfg) + self.assertIn("sql_alchemy_conn", cfg) + self.assertIn("fernet_key", cfg) # making sure replacement actually happened - assert "{AIRFLOW_HOME}" not in cfg - assert "{FERNET_KEY}" not in cfg + self.assertNotIn("{AIRFLOW_HOME}", cfg) + self.assertNotIn("{FERNET_KEY}", cfg) def test_config_use_original_when_original_and_fallback_are_present(self): - assert configuration.has_option("core", "FERNET_KEY") - assert not configuration.has_option("core", "FERNET_KEY_CMD") + self.assertTrue(configuration.has_option("core", "FERNET_KEY")) + self.assertFalse(configuration.has_option("core", "FERNET_KEY_CMD")) FERNET_KEY = configuration.get('core', 'FERNET_KEY') @@ -752,14 +752,14 @@ class CoreTest(unittest.TestCase): "FERNET_KEY" ) - assert FALLBACK_FERNET_KEY == FERNET_KEY + self.assertEqual(FERNET_KEY, FALLBACK_FERNET_KEY) # restore the conf back to the original state configuration.remove_option("core", "FERNET_KEY_CMD") def test_config_throw_error_when_original_and_fallback_is_absent(self): - assert configuration.has_option("core", "FERNET_KEY") - assert not configuration.has_option("core", "FERNET_KEY_CMD") + self.assertTrue(configuration.has_option("core", "FERNET_KEY")) + self.assertFalse(configuration.has_option("core", "FERNET_KEY_CMD")) FERNET_KEY = configuration.get("core", "FERNET_KEY") configuration.remove_option("core", "FERNET_KEY") @@ -769,20 +769,20 @@ class CoreTest(unittest.TestCase): exception = str(cm.exception) message = "section/key [core/fernet_key] not found in config" - assert exception == message + self.assertEqual(message, exception) # restore the conf back to the original state configuration.set("core", "FERNET_KEY", FERNET_KEY) - assert configuration.has_option("core", "FERNET_KEY") + self.assertTrue(configuration.has_option("core", "FERNET_KEY")) def test_config_override_original_when_non_empty_envvar_is_provided(self): key = "AIRFLOW__CORE__FERNET_KEY" value = "some value" - assert key not in os.environ + self.assertNotIn(key, os.environ) os.environ[key] = value FERNET_KEY = configuration.get('core', 'FERNET_KEY') - assert FERNET_KEY == value + self.assertEqual(value, FERNET_KEY) # restore the envvar back to the original state del os.environ[key] @@ -790,11 +790,11 @@ class CoreTest(unittest.TestCase): def test_config_override_original_when_empty_envvar_is_provided(self): key = "AIRFLOW__CORE__FERNET_KEY" value = "" - assert key not in os.environ + self.assertNotIn(key, os.environ) os.environ[key] = value FERNET_KEY = configuration.get('core', 'FERNET_KEY') - assert FERNET_KEY == value + self.assertEqual(value, FERNET_KEY) # restore the envvar back to the original state del os.environ[key] @@ -806,43 +806,43 @@ class CoreTest(unittest.TestCase): class Blah(LoggingMixin): pass - assert Blah().logger.name == "tests.core.Blah" - assert SequentialExecutor().logger.name == "airflow.executors.sequential_executor.SequentialExecutor" - assert LocalExecutor().logger.name == "airflow.executors.local_executor.LocalExecutor" + self.assertEqual("tests.core.Blah", Blah().logger.name) + self.assertEqual("airflow.executors.sequential_executor.SequentialExecutor", SequentialExecutor().logger.name) + self.assertEqual("airflow.executors.local_executor.LocalExecutor", LocalExecutor().logger.name) def test_round_time(self): rt1 = round_time(datetime(2015, 1, 1, 6), timedelta(days=1)) - assert rt1 == datetime(2015, 1, 1, 0, 0) + self.assertEqual(datetime(2015, 1, 1, 0, 0), rt1) rt2 = round_time(datetime(2015, 1, 2), relativedelta(months=1)) - assert rt2 == datetime(2015, 1, 1, 0, 0) + self.assertEqual(datetime(2015, 1, 1, 0, 0), rt2) rt3 = round_time(datetime(2015, 9, 16, 0, 0), timedelta(1), datetime( 2015, 9, 14, 0, 0)) - assert rt3 == datetime(2015, 9, 16, 0, 0) + self.assertEqual(datetime(2015, 9, 16, 0, 0), rt3) rt4 = round_time(datetime(2015, 9, 15, 0, 0), timedelta(1), datetime( 2015, 9, 14, 0, 0)) - assert rt4 == datetime(2015, 9, 15, 0, 0) + self.assertEqual(datetime(2015, 9, 15, 0, 0), rt4) rt5 = round_time(datetime(2015, 9, 14, 0, 0), timedelta(1), datetime( 2015, 9, 14, 0, 0)) - assert rt5 == datetime(2015, 9, 14, 0, 0) + self.assertEqual(datetime(2015, 9, 14, 0, 0), rt5) rt6 = round_time(datetime(2015, 9, 13, 0, 0), timedelta(1), datetime( 2015, 9, 14, 0, 0)) - assert rt6 == datetime(2015, 9, 14, 0, 0) + self.assertEqual(datetime(2015, 9, 14, 0, 0), rt6) def test_infer_time_unit(self): - assert infer_time_unit([130, 5400, 10]) == 'minutes' + self.assertEqual('minutes', infer_time_unit([130, 5400, 10])) - assert infer_time_unit([110, 50, 10, 100]) == 'seconds' + self.assertEqual('seconds', infer_time_unit([110, 50, 10, 100])) - assert infer_time_unit([100000, 50000, 10000, 20000]) == 'hours' + self.assertEqual('hours', infer_time_unit([100000, 50000, 10000, 20000])) - assert infer_time_unit([200000, 100000]) == 'days' + self.assertEqual('days', infer_time_unit([200000, 100000])) def test_scale_time_units(self): @@ -913,7 +913,7 @@ class CoreTest(unittest.TestCase): session = settings.Session() ti.refresh_from_db(session=session) # making sure it's actually running - assert State.RUNNING == ti.state + self.assertEqual(State.RUNNING, ti.state) ti = ( session.query(TI) .filter_by( @@ -930,7 +930,7 @@ class CoreTest(unittest.TestCase): # making sure that the task ended up as failed ti.refresh_from_db(session=session) - assert State.FAILED == ti.state + self.assertEqual(State.FAILED, ti.state) session.close() def test_task_fail_duration(self): @@ -964,10 +964,10 @@ class CoreTest(unittest.TestCase): dag_id=self.dag.dag_id, execution_date=DEFAULT_DATE).all() print(f_fails) - assert len(p_fails) == 0 - assert len(f_fails) == 1 + self.assertEqual(0, len(p_fails)) + self.assertEqual(1, len(f_fails)) # C - assert sum([f.duration for f in f_fails]) >= 3 + self.assertGreaterEqual(sum([f.duration for f in f_fails]), 3) def test_dag_stats(self): """Correctly sets/dirties/cleans rows of DagStat table""" @@ -987,11 +987,11 @@ class CoreTest(unittest.TestCase): qry = session.query(models.DagStat).all() - assert len(qry) == 1 - assert qry[0].dag_id == self.dag_bash.dag_id and\ - qry[0].state == State.RUNNING and\ - qry[0].count == 1 and\ - qry[0].dirty == False + self.assertEqual(1, len(qry)) + self.assertEqual(self.dag_bash.dag_id, qry[0].dag_id) + self.assertEqual(State.RUNNING, qry[0].state) + self.assertEqual(1, qry[0].count) + self.assertFalse(qry[0].dirty) run2 = self.dag_bash.create_dagrun( run_id="run2", @@ -1002,11 +1002,11 @@ class CoreTest(unittest.TestCase): qry = session.query(models.DagStat).all() - assert len(qry) == 1 - assert qry[0].dag_id == self.dag_bash.dag_id and\ - qry[0].state == State.RUNNING and\ - qry[0].count == 2 and\ - qry[0].dirty == False + self.assertEqual(1, len(qry)) + self.assertEqual(self.dag_bash.dag_id, qry[0].dag_id) + self.assertEqual(State.RUNNING, qry[0].state) + self.assertEqual(2, qry[0].count) + self.assertFalse(qry[0].dirty) session.query(models.DagRun).first().state = State.SUCCESS session.commit() @@ -1014,18 +1014,18 @@ class CoreTest(unittest.TestCase): models.DagStat.clean_dirty([self.dag_bash.dag_id], session=session) qry = session.query(models.DagStat).filter(models.DagStat.state == State.SUCCESS).all() - assert len(qry) == 1 - assert qry[0].dag_id == self.dag_bash.dag_id and\ - qry[0].state == State.SUCCESS and\ - qry[0].count == 1 and\ - qry[0].dirty == False + self.assertEqual(1, len(qry)) + self.assertEqual(self.dag_bash.dag_id, qry[0].dag_id) + self.assertEqual(State.SUCCESS, qry[0].state) + self.assertEqual(1, qry[0].count) + self.assertFalse(qry[0].dirty) qry = session.query(models.DagStat).filter(models.DagStat.state == State.RUNNING).all() - assert len(qry) == 1 - assert qry[0].dag_id == self.dag_bash.dag_id and\ - qry[0].state == State.RUNNING and\ - qry[0].count == 1 and\ - qry[0].dirty == False + self.assertEqual(1, len(qry)) + self.assertEqual(self.dag_bash.dag_id, qry[0].dag_id) + self.assertEqual(State.RUNNING, qry[0].state) + self.assertEqual(1, qry[0].count) + self.assertFalse(qry[0].dirty) session.query(models.DagRun).delete() session.query(models.DagStat).delete() @@ -1281,12 +1281,12 @@ class CliTests(unittest.TestCase): args = self.parser.parse_args([ 'pause', 'example_bash_operator']) cli.pause(args) - assert self.dagbag.dags['example_bash_operator'].is_paused in [True, 1] + self.assertIn(self.dagbag.dags['example_bash_operator'].is_paused, [True, 1]) args = self.parser.parse_args([ 'unpause', 'example_bash_operator']) cli.unpause(args) - assert self.dagbag.dags['example_bash_operator'].is_paused in [False, 0] + self.assertIn(self.dagbag.dags['example_bash_operator'].is_paused, [False, 0]) def test_subdag_clear(self): args = self.parser.parse_args([ @@ -1314,7 +1314,7 @@ class CliTests(unittest.TestCase): '-s', DEFAULT_DATE.isoformat()])) def test_process_subdir_path_with_placeholder(self): - assert cli.process_subdir('DAGS_FOLDER/abc') == os.path.join(settings.DAGS_FOLDER, 'abc') + self.assertEqual(os.path.join(settings.DAGS_FOLDER, 'abc'), cli.process_subdir('DAGS_FOLDER/abc')) def test_trigger_dag(self): cli.trigger_dag(self.parser.parse_args([ @@ -1373,22 +1373,22 @@ class CliTests(unittest.TestCase): cli.variables(self.parser.parse_args([ 'variables', '-i', 'variables1.json'])) - assert models.Variable.get('bar') == 'original' - assert models.Variable.get('foo') == '{"foo": "bar"}' + self.assertEqual('original', models.Variable.get('bar')) + self.assertEqual('{"foo": "bar"}', models.Variable.get('foo')) # Second export cli.variables(self.parser.parse_args([ 'variables', '-e', 'variables2.json'])) second_exp = open('variables2.json', 'r') - assert second_exp.read() == first_exp.read() + self.assertEqual(first_exp.read(), second_exp.read()) second_exp.close() first_exp.close() # Second import cli.variables(self.parser.parse_args([ 'variables', '-i', 'variables2.json'])) - assert models.Variable.get('bar') == 'original' - assert models.Variable.get('foo') == '{"foo": "bar"}' + self.assertEqual('original', models.Variable.get('bar')) + self.assertEqual('{"foo": "bar"}', models.Variable.get('foo')) session = settings.Session() session.query(Variable).delete() @@ -1413,91 +1413,91 @@ class WebUiTests(unittest.TestCase): def test_index(self): response = self.app.get('/', follow_redirects=True) - assert "DAGs" in response.data.decode('utf-8') - assert "example_bash_operator" in response.data.decode('utf-8') + self.assertIn("DAGs", response.data.decode('utf-8')) + self.assertIn("example_bash_operator", response.data.decode('utf-8')) def test_query(self): response = self.app.get('/admin/queryview/') - assert "Ad Hoc Query" in response.data.decode('utf-8') + self.assertIn("Ad Hoc Query", response.data.decode('utf-8')) response = self.app.get( "/admin/queryview/?" "conn_id=airflow_db&" "sql=SELECT+COUNT%281%29+as+TEST+FROM+task_instance") - assert "TEST" in response.data.decode('utf-8') + self.assertIn("TEST", response.data.decode('utf-8')) def test_health(self): response = self.app.get('/health') - assert 'The server is healthy!' in response.data.decode('utf-8') + self.assertIn('The server is healthy!', response.data.decode('utf-8')) def test_headers(self): response = self.app.get('/admin/airflow/headers') - assert '"headers":' in response.data.decode('utf-8') + self.assertIn('"headers":', response.data.decode('utf-8')) def test_noaccess(self): response = self.app.get('/admin/airflow/noaccess') - assert "You don't seem to have access." in response.data.decode('utf-8') + self.assertIn("You don't seem to have access.", response.data.decode('utf-8')) def test_pickle_info(self): response = self.app.get('/admin/airflow/pickle_info') - assert '{' in response.data.decode('utf-8') + self.assertIn('{', response.data.decode('utf-8')) def test_dag_views(self): response = self.app.get( '/admin/airflow/graph?dag_id=example_bash_operator') - assert "runme_0" in response.data.decode('utf-8') + self.assertIn("runme_0", response.data.decode('utf-8')) response = self.app.get( '/admin/airflow/tree?num_runs=25&dag_id=example_bash_operator') - assert "runme_0" in response.data.decode('utf-8') + self.assertIn("runme_0", response.data.decode('utf-8')) response = self.app.get( '/admin/airflow/duration?days=30&dag_id=example_bash_operator') - assert "example_bash_operator" in response.data.decode('utf-8') + self.assertIn("example_bash_operator", response.data.decode('utf-8')) response = self.app.get( '/admin/airflow/tries?days=30&dag_id=example_bash_operator') - assert "example_bash_operator" in response.data.decode('utf-8') + self.assertIn("example_bash_operator", response.data.decode('utf-8')) response = self.app.get( '/admin/airflow/landing_times?' 'days=30&dag_id=example_bash_operator') - assert "example_bash_operator" in response.data.decode('utf-8') + self.assertIn("example_bash_operator", response.data.decode('utf-8')) response = self.app.get( '/admin/airflow/gantt?dag_id=example_bash_operator') - assert "example_bash_operator" in response.data.decode('utf-8') + self.assertIn("example_bash_operator", response.data.decode('utf-8')) response = self.app.get( '/admin/airflow/code?dag_id=example_bash_operator') - assert "example_bash_operator" in response.data.decode('utf-8') + self.assertIn("example_bash_operator", response.data.decode('utf-8')) response = self.app.get( '/admin/airflow/blocked') response = self.app.get( '/admin/configurationview/') - assert "Airflow Configuration" in response.data.decode('utf-8') - assert "Running Configuration" in response.data.decode('utf-8') + self.assertIn("Airflow Configuration", response.data.decode('utf-8')) + self.assertIn("Running Configuration", response.data.decode('utf-8')) response = self.app.get( '/admin/airflow/rendered?' 'task_id=runme_1&dag_id=example_bash_operator&' 'execution_date={}'.format(DEFAULT_DATE_ISO)) - assert "example_bash_operator" in response.data.decode('utf-8') + self.assertIn("example_bash_operator", response.data.decode('utf-8')) response = self.app.get( '/admin/airflow/log?task_id=run_this_last&' 'dag_id=example_bash_operator&execution_date={}' ''.format(DEFAULT_DATE_ISO)) - assert "run_this_last" in response.data.decode('utf-8') + self.assertIn("run_this_last", response.data.decode('utf-8')) response = self.app.get( '/admin/airflow/task?' 'task_id=runme_0&dag_id=example_bash_operator&' 'execution_date={}'.format(DEFAULT_DATE_DS)) - assert "Attributes" in response.data.decode('utf-8') + self.assertIn("Attributes", response.data.decode('utf-8')) response = self.app.get( '/admin/airflow/dag_stats') - assert "example_bash_operator" in response.data.decode('utf-8') + self.assertIn("example_bash_operator", response.data.decode('utf-8')) response = self.app.get( '/admin/airflow/task_stats') - assert "example_bash_operator" in response.data.decode('utf-8') + self.assertIn("example_bash_operator", response.data.decode('utf-8')) url = ( "/admin/airflow/success?task_id=run_this_last&" "dag_id=example_bash_operator&upstream=false&downstream=false&" "future=false&past=false&execution_date={}&" "origin=/admin".format(DEFAULT_DATE_DS)) response = self.app.get(url) - assert "Wait a minute" in response.data.decode('utf-8') + self.assertIn("Wait a minute", response.data.decode('utf-8')) response = self.app.get(url + "&confirmed=true") response = self.app.get( '/admin/airflow/clear?task_id=run_this_last&' @@ -1505,19 +1505,19 @@ class WebUiTests(unittest.TestCase): 'upstream=true&downstream=false&' 'execution_date={}&' 'origin=/admin'.format(DEFAULT_DATE_DS)) - assert "Wait a minute" in response.data.decode('utf-8') + self.assertIn("Wait a minute", response.data.decode('utf-8')) url = ( "/admin/airflow/success?task_id=section-1&" "dag_id=example_subdag_operator&upstream=true&downstream=true&" "recursive=true&future=false&past=false&execution_date={}&" "origin=/admin".format(DEFAULT_DATE_DS)) response = self.app.get(url) - assert "Wait a minute" in response.data.decode('utf-8') - assert "section-1-task-1" in response.data.decode('utf-8') - assert "section-1-task-2" in response.data.decode('utf-8') - assert "section-1-task-3" in response.data.decode('utf-8') - assert "section-1-task-4" in response.data.decode('utf-8') - assert "section-1-task-5" in response.data.decode('utf-8') + self.assertIn("Wait a minute", response.data.decode('utf-8')) + self.assertIn("section-1-task-1", response.data.decode('utf-8')) + self.assertIn("section-1-task-2", response.data.decode('utf-8')) + self.assertIn("section-1-task-3", response.data.decode('utf-8')) + self.assertIn("section-1-task-4", response.data.decode('utf-8')) + self.assertIn("section-1-task-5", response.data.decode('utf-8')) response = self.app.get(url + "&confirmed=true") url = ( "/admin/airflow/clear?task_id=runme_1&" @@ -1526,7 +1526,7 @@ class WebUiTests(unittest.TestCase): "execution_date={}&" "origin=/admin".format(DEFAULT_DATE_DS)) response = self.app.get(url) - assert "Wait a minute" in response.data.decode('utf-8') + self.assertIn("Wait a minute", response.data.decode('utf-8')) response = self.app.get(url + "&confirmed=true") url = ( "/admin/airflow/run?task_id=runme_0&" @@ -1541,7 +1541,7 @@ class WebUiTests(unittest.TestCase): "/admin/airflow/paused?" "dag_id=example_python_operator&is_paused=false") response = self.app.get("/admin/xcom", follow_redirects=True) - assert "Xcoms" in response.data.decode('utf-8') + self.assertIn("Xcoms", response.data.decode('utf-8')) def test_charts(self): session = Session() @@ -1553,14 +1553,14 @@ class WebUiTests(unittest.TestCase): response = self.app.get( '/admin/airflow/chart' '?chart_id={}&iteration_no=1'.format(chart_id)) - assert "Airflow task instance by type" in response.data.decode('utf-8') + self.assertIn("Airflow task instance by type", response.data.decode('utf-8')) response = self.app.get( '/admin/airflow/chart_data' '?chart_id={}&iteration_no=1'.format(chart_id)) - assert "example" in response.data.decode('utf-8') + self.assertIn("example", response.data.decode('utf-8')) response = self.app.get( '/admin/airflow/dag_details?dag_id=example_branch_operator') - assert "run_this_first" in response.data.decode('utf-8') + self.assertIn("run_this_first", response.data.decode('utf-8')) def test_fetch_task_instance(self): url = ( @@ -1568,7 +1568,7 @@ class WebUiTests(unittest.TestCase): "dag_id=example_bash_operator&" "execution_date={}".format(DEFAULT_DATE_DS)) response = self.app.get(url) - assert "{}" in response.data.decode('utf-8') + self.assertIn("{}", response.data.decode('utf-8')) TI = models.TaskInstance ti = TI( @@ -1577,7 +1577,7 @@ class WebUiTests(unittest.TestCase): job.run() response = self.app.get(url) - assert "runme_0" in response.data.decode('utf-8') + self.assertIn("runme_0", response.data.decode('utf-8')) def tearDown(self): configuration.conf.set("webserver", "expose_config", "False") @@ -1624,19 +1624,19 @@ class WebPasswordAuthTest(unittest.TestCase): return self.app.get('/admin/airflow/logout', follow_redirects=True) def test_login_logout_password_auth(self): - assert configuration.getboolean('webserver', 'authenticate') is True + self.assertTrue(configuration.getboolean('webserver', 'authenticate')) response = self.login('user1', 'whatever') - assert 'Incorrect login details' in response.data.decode('utf-8') + self.assertIn('Incorrect login details', response.data.decode('utf-8')) response = self.login('airflow_passwordauth', 'wrongpassword') - assert 'Incorrect login details' in response.data.decode('utf-8') + self.assertIn('Incorrect login details', response.data.decode('utf-8')) response = self.login('airflow_passwordauth', 'password') - assert 'Data Profiling' in response.data.decode('utf-8') + self.assertIn('Data Profiling', response.data.decode('utf-8')) response = self.logout() - assert 'form-signin' in response.data.decode('utf-8') + self.assertIn('form-signin', response.data.decode('utf-8')) def test_unauthorized_password_auth(self): response = self.app.get("/admin/airflow/landing_times") @@ -1691,19 +1691,19 @@ class WebLdapAuthTest(unittest.TestCase): return self.app.get('/admin/airflow/logout', follow_redirects=True) def test_login_logout_ldap(self): - assert configuration.getboolean('webserver', 'authenticate') is True + self.assertTrue(configuration.getboolean('webserver', 'authenticate')) response = self.login('user1', 'userx') - assert 'Incorrect login details' in response.data.decode('utf-8') + self.assertIn('Incorrect login details', response.data.decode('utf-8')) response = self.login('userz', 'user1') - assert 'Incorrect login details' in response.data.decode('utf-8') + self.assertIn('Incorrect login details', response.data.decode('utf-8')) response = self.login('user1', 'user1') - assert 'Data Profiling' in response.data.decode('utf-8') + self.assertIn('Data Profiling', response.data.decode('utf-8')) response = self.logout() - assert 'form-signin' in response.data.decode('utf-8') + self.assertIn('form-signin', response.data.decode('utf-8')) def test_unauthorized(self): response = self.app.get("/admin/airflow/landing_times") @@ -1711,8 +1711,8 @@ class WebLdapAuthTest(unittest.TestCase): def test_no_filter(self): response = self.login('user1', 'user1') - assert 'Data Profiling' in response.data.decode('utf-8') - assert 'Connections' in response.data.decode('utf-8') + self.assertIn('Data Profiling', response.data.decode('utf-8')) + self.assertIn('Connections', response.data.decode('utf-8')) def test_with_filters(self): configuration.conf.set('ldap', 'superuser_filter', @@ -1721,10 +1721,10 @@ class WebLdapAuthTest(unittest.TestCase): 'description=dataprofiler') response = self.login('dataprofiler', 'dataprofiler') - assert 'Data Profiling' in response.data.decode('utf-8') + self.assertIn('Data Profiling', response.data.decode('utf-8')) response = self.login('superuser', 'superuser') - assert 'Connections' in response.data.decode('utf-8') + self.assertIn('Connections', response.data.decode('utf-8')) def tearDown(self): configuration.load_test_config() @@ -1760,7 +1760,7 @@ class LdapGroupTest(unittest.TestCase): mu = models.User(username=user, is_superuser=False) auth = LdapUser(mu) - assert set(auth.ldap_groups) == set(users[user]) + self.assertEqual(set(users[user]), set(auth.ldap_groups)) def tearDown(self): configuration.load_test_config() @@ -1924,58 +1924,58 @@ class ConnectionTest(unittest.TestCase): def test_using_env_var(self): c = SqliteHook.get_connection(conn_id='test_uri') - assert c.host == 'ec2.compute.com' - assert c.schema == 'the_database' - assert c.login == 'username' - assert c.password == 'password' - assert c.port == 5432 + self.assertEqual('ec2.compute.com', c.host) + self.assertEqual('the_database', c.schema) + self.assertEqual('username', c.login) + self.assertEqual('password', c.password) + self.assertEqual(5432, c.port) def test_using_unix_socket_env_var(self): c = SqliteHook.get_connection(conn_id='test_uri_no_creds') - assert c.host == 'ec2.compute.com' - assert c.schema == 'the_database' - assert c.login is None - assert c.password is None - assert c.port is None + self.assertEqual('ec2.compute.com', c.host) + self.assertEqual('the_database', c.schema) + self.assertIsNone(c.login) + self.assertIsNone(c.password) + self.assertIsNone(c.port) def test_param_setup(self): c = models.Connection(conn_id='local_mysql', conn_type='mysql', host='localhost', login='airflow', password='airflow', schema='airflow') - assert c.host == 'localhost' - assert c.schema == 'airflow' - assert c.login == 'airflow' - assert c.password == 'airflow' - assert c.port is None + self.assertEqual('localhost', c.host) + self.assertEqual('airflow', c.schema) + self.assertEqual('airflow', c.login) + self.assertEqual('airflow', c.password) + self.assertIsNone(c.port) def test_env_var_priority(self): c = SqliteHook.get_connection(conn_id='airflow_db') - assert c.host != 'ec2.compute.com' + self.assertNotEqual('ec2.compute.com', c.host) os.environ['AIRFLOW_CONN_AIRFLOW_DB'] = \ 'postgres://username:[email protected]:5432/the_database' c = SqliteHook.get_connection(conn_id='airflow_db') - assert c.host == 'ec2.compute.com' - assert c.schema == 'the_database' - assert c.login == 'username' - assert c.password == 'password' - assert c.port == 5432 + self.assertEqual('ec2.compute.com', c.host) + self.assertEqual('the_database', c.schema) + self.assertEqual('username', c.login) + self.assertEqual('password', c.password) + self.assertEqual(5432, c.port) del os.environ['AIRFLOW_CONN_AIRFLOW_DB'] def test_dbapi_get_uri(self): conn = BaseHook.get_connection(conn_id='test_uri') hook = conn.get_hook() - assert hook.get_uri() == 'postgres://username:[email protected]:5432/the_database' + self.assertEqual('postgres://username:[email protected]:5432/the_database', hook.get_uri()) conn2 = BaseHook.get_connection(conn_id='test_uri_no_creds') hook2 = conn2.get_hook() - assert hook2.get_uri() == 'postgres://ec2.compute.com/the_database' + self.assertEqual('postgres://ec2.compute.com/the_database', hook2.get_uri()) def test_dbapi_get_sqlalchemy_engine(self): conn = BaseHook.get_connection(conn_id='test_uri') hook = conn.get_hook() engine = hook.get_sqlalchemy_engine() - assert isinstance(engine, sqlalchemy.engine.Engine) - assert str(engine.url) == 'postgres://username:[email protected]:5432/the_database' + self.assertIsInstance(engine, sqlalchemy.engine.Engine) + self.assertEqual('postgres://username:[email protected]:5432/the_database', str(engine.url)) class WebHDFSHookTest(unittest.TestCase): @@ -1985,12 +1985,12 @@ class WebHDFSHookTest(unittest.TestCase): def test_simple_init(self): from airflow.hooks.webhdfs_hook import WebHDFSHook c = WebHDFSHook() - assert c.proxy_user == None + self.assertIsNone(c.proxy_user) def test_init_proxy_user(self): from airflow.hooks.webhdfs_hook import WebHDFSHook c = WebHDFSHook(proxy_user='someone') - assert c.proxy_user == 'someone' + self.assertEqual('someone', c.proxy_user) try: @@ -2076,14 +2076,14 @@ class EmailTest(unittest.TestCase): def test_default_backend(self, mock_send_email): res = utils.email.send_email('to', 'subject', 'content') mock_send_email.assert_called_with('to', 'subject', 'content') - assert res == mock_send_email.return_value + self.assertEqual(mock_send_email.return_value, res) @mock.patch('airflow.utils.email.send_email_smtp') def test_custom_backend(self, mock_send_email): configuration.set('email', 'EMAIL_BACKEND', 'tests.core.send_email_test') utils.email.send_email('to', 'subject', 'content') send_email_test.assert_called_with('to', 'subject', 'content', files=None, dryrun=False, cc=None, bcc=None, mime_subtype='mixed') - assert not mock_send_email.called + self.assertFalse(mock_send_email.called) class EmailSmtpTest(unittest.TestCase): @@ -2096,18 +2096,18 @@ class EmailSmtpTest(unittest.TestCase): attachment.write(b'attachment') attachment.seek(0) utils.email.send_email_smtp('to', 'subject', 'content', files=[attachment.name]) - assert mock_send_mime.called + self.assertTrue(mock_send_mime.called) call_args = mock_send_mime.call_args[0] - assert call_args[0] == configuration.get('smtp', 'SMTP_MAIL_FROM') - assert call_args[1] == ['to'] + self.assertEqual(configuration.get('smtp', 'SMTP_MAIL_FROM'), call_args[0]) + self.assertEqual(['to'], call_args[1]) msg = call_args[2] - assert msg['Subject'] == 'subject' - assert msg['From'] == configuration.get('smtp', 'SMTP_MAIL_FROM') - assert len(msg.get_payload()) == 2 - assert msg.get_payload()[-1].get(u'Content-Disposition') == \ - u'attachment; filename="' + os.path.basename(attachment.name) + '"' + self.assertEqual('subject', msg['Subject']) + self.assertEqual(configuration.get('smtp', 'SMTP_MAIL_FROM'), msg['From']) + self.assertEqual(2, len(msg.get_payload())) + self.assertEqual(u'attachment; filename="' + os.path.basename(attachment.name) + '"', + msg.get_payload()[-1].get(u'Content-Disposition')) mimeapp = MIMEApplication('attachment') - assert msg.get_payload()[-1].get_payload() == mimeapp.get_payload() + self.assertEqual(mimeapp.get_payload(), msg.get_payload()[-1].get_payload()) @mock.patch('airflow.utils.email.send_MIME_email') def test_send_bcc_smtp(self, mock_send_mime): @@ -2115,18 +2115,18 @@ class EmailSmtpTest(unittest.TestCase): attachment.write(b'attachment') attachment.seek(0) utils.email.send_email_smtp('to', 'subject', 'content', files=[attachment.name], cc='cc', bcc='bcc') - assert mock_send_mime.called + self.assertTrue(mock_send_mime.called) call_args = mock_send_mime.call_args[0] - assert call_args[0] == configuration.get('smtp', 'SMTP_MAIL_FROM') - assert call_args[1] == ['to', 'cc', 'bcc'] + self.assertEqual(configuration.get('smtp', 'SMTP_MAIL_FROM'), call_args[0]) + self.assertEqual(['to', 'cc', 'bcc'], call_args[1]) msg = call_args[2] - assert msg['Subject'] == 'subject' - assert msg['From'] == configuration.get('smtp', 'SMTP_MAIL_FROM') - assert len(msg.get_payload()) == 2 - assert msg.get_payload()[-1].get(u'Content-Disposition') == \ - u'attachment; filename="' + os.path.basename(attachment.name) + '"' + self.assertEqual('subject', msg['Subject']) + self.assertEqual(configuration.get('smtp', 'SMTP_MAIL_FROM'), msg['From']) + self.assertEqual(2, len(msg.get_payload())) + self.assertEqual(u'attachment; filename="' + os.path.basename(attachment.name) + '"', + msg.get_payload()[-1].get(u'Content-Disposition')) mimeapp = MIMEApplication('attachment') - assert msg.get_payload()[-1].get_payload() == mimeapp.get_payload() + self.assertEqual(mimeapp.get_payload(), msg.get_payload()[-1].get_payload()) @mock.patch('smtplib.SMTP_SSL') @@ -2140,13 +2140,13 @@ class EmailSmtpTest(unittest.TestCase): configuration.get('smtp', 'SMTP_HOST'), configuration.getint('smtp', 'SMTP_PORT'), ) - assert mock_smtp.return_value.starttls.called + self.assertTrue(mock_smtp.return_value.starttls.called) mock_smtp.return_value.login.assert_called_with( configuration.get('smtp', 'SMTP_USER'), configuration.get('smtp', 'SMTP_PASSWORD'), ) mock_smtp.return_value.sendmail.assert_called_with('from', 'to', msg.as_string()) - assert mock_smtp.return_value.quit.called + self.assertTrue(mock_smtp.return_value.quit.called) @mock.patch('smtplib.SMTP_SSL') @mock.patch('smtplib.SMTP') @@ -2155,7 +2155,7 @@ class EmailSmtpTest(unittest.TestCase): mock_smtp.return_value = mock.Mock() mock_smtp_ssl.return_value = mock.Mock() utils.email.send_MIME_email('from', 'to', MIMEMultipart(), dryrun=False) - assert not mock_smtp.called + self.assertFalse(mock_smtp.called) mock_smtp_ssl.assert_called_with( configuration.get('smtp', 'SMTP_HOST'), configuration.getint('smtp', 'SMTP_PORT'), @@ -2169,19 +2169,19 @@ class EmailSmtpTest(unittest.TestCase): mock_smtp.return_value = mock.Mock() mock_smtp_ssl.return_value = mock.Mock() utils.email.send_MIME_email('from', 'to', MIMEMultipart(), dryrun=False) - assert not mock_smtp_ssl.called + self.assertFalse(mock_smtp_ssl.called) mock_smtp.assert_called_with( configuration.get('smtp', 'SMTP_HOST'), configuration.getint('smtp', 'SMTP_PORT'), ) - assert not mock_smtp.login.called + self.assertFalse(mock_smtp.login.called) @mock.patch('smtplib.SMTP_SSL') @mock.patch('smtplib.SMTP') def test_send_mime_dryrun(self, mock_smtp, mock_smtp_ssl): utils.email.send_MIME_email('from', 'to', MIMEMultipart(), dryrun=True) - assert not mock_smtp.called - assert not mock_smtp_ssl.called + self.assertFalse(mock_smtp.called) + self.assertFalse(mock_smtp_ssl.called) if __name__ == '__main__': unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/dags/no_dags.py ---------------------------------------------------------------------- diff --git a/tests/dags/no_dags.py b/tests/dags/no_dags.py index a84b6da..759b563 100644 --- a/tests/dags/no_dags.py +++ b/tests/dags/no_dags.py @@ -11,4 +11,4 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# \ No newline at end of file +# http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index e520b44..7f3c285 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -1056,7 +1056,7 @@ class SchedulerJobTest(unittest.TestCase): logging.info("Test ran in %.2fs, expected %.2fs", run_duration, expected_run_duration) - assert run_duration - expected_run_duration < 5.0 + self.assertLess(run_duration - expected_run_duration, 5.0) def test_dag_with_system_exit(self): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/models.py ---------------------------------------------------------------------- diff --git a/tests/models.py b/tests/models.py index 37f109d..346f47c 100644 --- a/tests/models.py +++ b/tests/models.py @@ -49,8 +49,8 @@ class DagTest(unittest.TestCase): """ dag = models.DAG('test-dag') - assert type(dag.params) == dict - assert len(dag.params) == 0 + self.assertEqual(dict, type(dag.params)) + self.assertEqual(0, len(dag.params)) def test_params_passed_and_params_in_default_args_no_override(self): """ @@ -68,7 +68,7 @@ class DagTest(unittest.TestCase): params_combined = params1.copy() params_combined.update(params2) - assert dag.params == params_combined + self.assertEqual(params_combined, dag.params) def test_dag_as_context_manager(self): """ @@ -121,7 +121,7 @@ class DagRunTest(unittest.TestCase): def test_id_for_date(self): run_id = models.DagRun.id_for_date( datetime.datetime(2015, 1, 2, 3, 4, 5, 6, None)) - assert run_id == 'scheduled__2015-01-02T03:04:05', ( + self.assertEqual('scheduled__2015-01-02T03:04:05', run_id, msg= 'Generated run_id did not match expectations: {0}'.format(run_id)) @@ -139,10 +139,10 @@ class DagBagTest(unittest.TestCase): for dag_id in some_expected_dag_ids: dag = dagbag.get_dag(dag_id) - assert dag is not None - assert dag.dag_id == dag_id + self.assertIsNotNone(dag) + self.assertEqual(dag_id, dag.dag_id) - assert dagbag.size() >= 7 + self.assertGreaterEqual(dagbag.size(), 7) def test_get_non_existing_dag(self): """ @@ -151,7 +151,7 @@ class DagBagTest(unittest.TestCase): dagbag = models.DagBag(include_examples=True) non_existing_dag_id = "non_existing_dag_id" - assert dagbag.get_dag(non_existing_dag_id) is None + self.assertIsNone(dagbag.get_dag(non_existing_dag_id)) def test_process_file_that_contains_multi_bytes_char(self): """ @@ -163,7 +163,7 @@ class DagBagTest(unittest.TestCase): f.flush() dagbag = models.DagBag(include_examples=True) - assert dagbag.process_file(f.name) == [] + self.assertEqual([], dagbag.process_file(f.name)) def test_zip(self): """ @@ -171,7 +171,7 @@ class DagBagTest(unittest.TestCase): """ dagbag = models.DagBag() dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")) - assert dagbag.get_dag("test_zip_dag") + self.assertTrue(dagbag.get_dag("test_zip_dag")) @patch.object(DagModel,'get_current') def test_get_dag_without_refresh(self, mock_dagmodel): @@ -196,9 +196,9 @@ class DagBagTest(unittest.TestCase): processed_files = dagbag.process_file_calls # Should not call process_file agani, since it's already loaded during init. - assert dagbag.process_file_calls == 1 - assert dagbag.get_dag(dag_id) != None - assert dagbag.process_file_calls == 1 + self.assertEqual(1, dagbag.process_file_calls) + self.assertIsNotNone(dagbag.get_dag(dag_id)) + self.assertEqual(1, dagbag.process_file_calls) def test_get_dag_fileloc(self): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/operators/hive_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py index fec5e69..69166fd 100644 --- a/tests/operators/hive_operator.py +++ b/tests/operators/hive_operator.py @@ -83,9 +83,9 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: hook.get_conn(self.nondefault_schema) # Verify - assert connect_mock.called + self.assertTrue(connect_mock.called) (args, kwargs) = connect_mock.call_args_list[0] - assert kwargs['database'] == self.nondefault_schema + self.assertEqual(self.nondefault_schema, kwargs['database']) def test_get_results_with_schema(self): from airflow.hooks.hive_hooks import HiveServer2Hook @@ -126,10 +126,10 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: hook.get_records(sql, self.nondefault_schema) # Verify - assert self.connect_mock.called + self.assertTrue(self.connect_mock.called) (args, kwargs) = self.connect_mock.call_args_list[0] - assert args[0] == sql - assert kwargs['schema'] == self.nondefault_schema + self.assertEqual(sql, args[0]) + self.assertEqual(self.nondefault_schema, kwargs['schema']) @mock.patch('HiveServer2Hook.get_results', return_value={'data': []}) def test_get_pandas_df_with_schema(self, get_results_mock): @@ -143,10 +143,10 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: hook.get_pandas_df(sql, self.nondefault_schema) # Verify - assert self.connect_mock.called + self.assertTrue(self.connect_mock.called) (args, kwargs) = self.connect_mock.call_args_list[0] - assert args[0] == sql - assert kwargs['schema'] == self.nondefault_schema + self.assertEqual(sql, args[0]) + self.assertEqual(self.nondefault_schema, kwargs['schema']) class HivePrestoTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/operators/latest_only_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/latest_only_operator.py b/tests/operators/latest_only_operator.py index 37aec38..3ac5fac 100644 --- a/tests/operators/latest_only_operator.py +++ b/tests/operators/latest_only_operator.py @@ -77,17 +77,17 @@ class LatestOnlyOperatorTest(unittest.TestCase): latest_instances = get_task_instances('latest') exec_date_to_latest_state = { ti.execution_date: ti.state for ti in latest_instances} - assert exec_date_to_latest_state == { + self.assertEqual({ datetime.datetime(2016, 1, 1): 'success', datetime.datetime(2016, 1, 1, 12): 'success', - datetime.datetime(2016, 1, 2): 'success', - } + datetime.datetime(2016, 1, 2): 'success', }, + exec_date_to_latest_state) downstream_instances = get_task_instances('downstream') exec_date_to_downstream_state = { ti.execution_date: ti.state for ti in downstream_instances} - assert exec_date_to_downstream_state == { + self.assertEqual({ datetime.datetime(2016, 1, 1): 'skipped', datetime.datetime(2016, 1, 1, 12): 'skipped', - datetime.datetime(2016, 1, 2): 'success', - } + datetime.datetime(2016, 1, 2): 'success',}, + exec_date_to_downstream_state) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/operators/operators.py ---------------------------------------------------------------------- diff --git a/tests/operators/operators.py b/tests/operators/operators.py index 7aaf12e..6fc2449 100644 --- a/tests/operators/operators.py +++ b/tests/operators/operators.py @@ -87,7 +87,7 @@ class MySqlTest(unittest.TestCase): h.bulk_load("test_airflow", t.name) c.execute("SELECT dummy FROM test_airflow") results = tuple(result[0] for result in c.fetchall()) - assert sorted(records) == sorted(results) + self.assertEqual(sorted(results), sorted(records)) def test_mysql_to_mysql(self): sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;" http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/operators/sensors.py ---------------------------------------------------------------------- diff --git a/tests/operators/sensors.py b/tests/operators/sensors.py index e8b272b..e77216b 100644 --- a/tests/operators/sensors.py +++ b/tests/operators/sensors.py @@ -180,4 +180,4 @@ class HdfsSensorTests(unittest.TestCase): # When # Then with self.assertRaises(AirflowSensorTimeout): - task.execute(None) \ No newline at end of file + task.execute(None) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/plugins_manager.py ---------------------------------------------------------------------- diff --git a/tests/plugins_manager.py b/tests/plugins_manager.py index 0012cdf..520f822 100644 --- a/tests/plugins_manager.py +++ b/tests/plugins_manager.py @@ -35,19 +35,19 @@ class PluginsTest(unittest.TestCase): def test_operators(self): from airflow.operators.test_plugin import PluginOperator - assert issubclass(PluginOperator, BaseOperator) + self.assertTrue(issubclass(PluginOperator, BaseOperator)) def test_hooks(self): from airflow.hooks.test_plugin import PluginHook - assert issubclass(PluginHook, BaseHook) + self.assertTrue(issubclass(PluginHook, BaseHook)) def test_executors(self): from airflow.executors.test_plugin import PluginExecutor - assert issubclass(PluginExecutor, BaseExecutor) + self.assertTrue(issubclass(PluginExecutor, BaseExecutor)) def test_macros(self): from airflow.macros.test_plugin import plugin_macro - assert callable(plugin_macro) + self.assertTrue(callable(plugin_macro)) def test_admin_views(self): app = cached_app() @@ -55,11 +55,11 @@ class PluginsTest(unittest.TestCase): category = admin._menu_categories['Test Plugin'] [admin_view] = [v for v in category.get_children() if isinstance(v, MenuView)] - assert admin_view.name == 'Test View' + self.assertEqual('Test View', admin_view.name) def test_flask_blueprints(self): app = cached_app() - assert isinstance(app.blueprints['test_plugin'], Blueprint) + self.assertIsInstance(app.blueprints['test_plugin'], Blueprint) def test_menu_links(self): app = cached_app() @@ -67,4 +67,4 @@ class PluginsTest(unittest.TestCase): category = admin._menu_categories['Test Plugin'] [menu_link] = [ml for ml in category.get_children() if isinstance(ml, MenuLink)] - assert menu_link.name == 'Test Menu Link' + self.assertEqual('Test Menu Link', menu_link.name) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/www/api/experimental/test_endpoints.py ---------------------------------------------------------------------- diff --git a/tests/www/api/experimental/test_endpoints.py b/tests/www/api/experimental/test_endpoints.py index 2134a44..8218360 100644 --- a/tests/www/api/experimental/test_endpoints.py +++ b/tests/www/api/experimental/test_endpoints.py @@ -31,16 +31,16 @@ class ApiExperimentalTests(unittest.TestCase): url_template = '/api/experimental/dags/{}/tasks/{}' response = self.app.get(url_template.format('example_bash_operator', 'runme_0')) - assert '"email"' in response.data.decode('utf-8') - assert 'error' not in response.data.decode('utf-8') + self.assertIn('"email"', response.data.decode('utf-8')) + self.assertNotIn('error', response.data.decode('utf-8')) self.assertEqual(200, response.status_code) response = self.app.get(url_template.format('example_bash_operator', 'DNE')) - assert 'error' in response.data.decode('utf-8') + self.assertIn('error', response.data.decode('utf-8')) self.assertEqual(404, response.status_code) response = self.app.get(url_template.format('DNE', 'DNE')) - assert 'error' in response.data.decode('utf-8') + self.assertIn('error', response.data.decode('utf-8')) self.assertEqual(404, response.status_code) def test_trigger_dag(self):
