Repository: incubator-airflow Updated Branches: refs/heads/v1-10-test 2e59431b8 -> d8716deac (forced update)
[AIRFLOW-2474] Only import snakebite if using py2 Closes #3365 from jgao54/snakebite-import (cherry picked from commit c414670a095c97d41fe399948ebc9ed6d20be09d) Signed-off-by: Kaxil Naik <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d8716dea Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d8716dea Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d8716dea Branch: refs/heads/v1-10-test Commit: d8716deaccdcfc2fd25fb093922169d70fbd319f Parents: 4c70c17 Author: Joy Gao <[email protected]> Authored: Thu May 17 10:05:52 2018 +0100 Committer: Kaxil Naik <[email protected]> Committed: Thu May 17 12:03:32 2018 +0100 ---------------------------------------------------------------------- airflow/hooks/hdfs_hook.py | 8 +-- tests/core.py | 5 +- tests/jobs.py | 154 +++++++++++++--------------------------- 3 files changed, 56 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8716dea/airflow/hooks/hdfs_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/hdfs_hook.py b/airflow/hooks/hdfs_hook.py index c6e68c8..a87fa94 100644 --- a/airflow/hooks/hdfs_hook.py +++ b/airflow/hooks/hdfs_hook.py @@ -17,14 +17,14 @@ # specific language governing permissions and limitations # under the License. +from six import PY2 from airflow.hooks.base_hook import BaseHook from airflow import configuration -try: - snakebite_imported = True +snakebite_imported = False +if PY2: from snakebite.client import Client, HAClient, Namenode, AutoConfigClient -except ImportError: - snakebite_imported = False + snakebite_imported = True from airflow.exceptions import AirflowException http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8716dea/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index ce32482..b032882 100644 --- a/tests/core.py +++ b/tests/core.py @@ -2293,11 +2293,10 @@ class WebHDFSHookTest(unittest.TestCase): self.assertEqual('someone', c.proxy_user) -try: +HDFSHook = None +if six.PY2: from airflow.hooks.hdfs_hook import HDFSHook import snakebite -except ImportError: - HDFSHook = None @unittest.skipIf(HDFSHook is None, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8716dea/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 9e07645..09ed5b8 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -2750,131 +2750,77 @@ class SchedulerJobTest(unittest.TestCase): Test to check that a DAG with catchup = False only schedules beginning now, not back to the start date """ - now = timezone.utcnow() - six_hours_ago_to_the_hour = (now - datetime.timedelta(hours=6)).replace(minute=0, second=0, microsecond=0) - three_minutes_ago = now - datetime.timedelta(minutes=3) - two_hours_and_three_minutes_ago = three_minutes_ago - datetime.timedelta(hours=2) + def setup_dag(dag_id, schedule_interval, start_date, catchup): + default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': start_date + } + dag = DAG(dag_id, + schedule_interval=schedule_interval, + max_active_runs=1, + catchup=catchup, + default_args=default_args) + + t1 = DummyOperator(task_id='t1', dag=dag) + t2 = DummyOperator(task_id='t2', dag=dag) + t2.set_upstream(t1) + t3 = DummyOperator(task_id='t3', dag=dag) + t3.set_upstream(t2) + + session = settings.Session() + orm_dag = DagModel(dag_id=dag.dag_id) + session.merge(orm_dag) + session.commit() + session.close() - START_DATE = six_hours_ago_to_the_hour - DAG_NAME1 = 'no_catchup_test1' - DAG_NAME2 = 'no_catchup_test2' - DAG_NAME3 = 'no_catchup_test3' - DAG_NAME4 = 'no_catchup_test4' + return dag - default_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'start_date': START_DATE + now = timezone.utcnow() + six_hours_ago_to_the_hour = (now - datetime.timedelta(hours=6)).replace( + minute=0, second=0, microsecond=0) + half_an_hour_ago = now - datetime.timedelta(minutes=30) + two_hours_ago = now - datetime.timedelta(hours=2) - } - dag1 = DAG(DAG_NAME1, - schedule_interval='* * * * *', - max_active_runs=1, - default_args=default_args - ) + scheduler = SchedulerJob() + dag1 = setup_dag(dag_id='dag_with_catchup', + schedule_interval='* * * * *', + start_date=six_hours_ago_to_the_hour, + catchup=True) default_catchup = configuration.conf.getboolean('scheduler', 'catchup_by_default') - # Test configs have catchup by default ON - self.assertEqual(default_catchup, True) - - # Correct default? self.assertEqual(dag1.catchup, True) - dag2 = DAG(DAG_NAME2, - schedule_interval='* * * * *', - max_active_runs=1, - catchup=False, - default_args=default_args - ) - - run_this_1 = DummyOperator(task_id='run_this_1', dag=dag2) - run_this_2 = DummyOperator(task_id='run_this_2', dag=dag2) - run_this_2.set_upstream(run_this_1) - run_this_3 = DummyOperator(task_id='run_this_3', dag=dag2) - run_this_3.set_upstream(run_this_2) - - session = settings.Session() - orm_dag = DagModel(dag_id=dag2.dag_id) - session.merge(orm_dag) - session.commit() - session.close() - - scheduler = SchedulerJob() - dag2.clear() - + dag2 = setup_dag(dag_id='dag_without_catchup_ten_minute', + schedule_interval='*/10 * * * *', + start_date=six_hours_ago_to_the_hour, + catchup=False) dr = scheduler.create_dag_run(dag2) - # We had better get a dag run self.assertIsNotNone(dr) - - # The DR should be scheduled in the last 3 minutes, not 6 hours ago - self.assertGreater(dr.execution_date, three_minutes_ago) - + # The DR should be scheduled in the last half an hour, not 6 hours ago + self.assertGreater(dr.execution_date, half_an_hour_ago) # The DR should be scheduled BEFORE now self.assertLess(dr.execution_date, timezone.utcnow()) - dag3 = DAG(DAG_NAME3, - schedule_interval='@hourly', - max_active_runs=1, - catchup=False, - default_args=default_args - ) - - run_this_1 = DummyOperator(task_id='run_this_1', dag=dag3) - run_this_2 = DummyOperator(task_id='run_this_2', dag=dag3) - run_this_2.set_upstream(run_this_1) - run_this_3 = DummyOperator(task_id='run_this_3', dag=dag3) - run_this_3.set_upstream(run_this_2) - - session = settings.Session() - orm_dag = DagModel(dag_id=dag3.dag_id) - session.merge(orm_dag) - session.commit() - session.close() - - scheduler = SchedulerJob() - dag3.clear() - - dr = None + dag3 = setup_dag(dag_id='dag_without_catchup_hourly', + schedule_interval='@hourly', + start_date=six_hours_ago_to_the_hour, + catchup=False) dr = scheduler.create_dag_run(dag3) - # We had better get a dag run self.assertIsNotNone(dr) - - # The DR should be scheduled in the last two hours, not 6 hours ago - self.assertGreater(dr.execution_date, two_hours_and_three_minutes_ago) - + # The DR should be scheduled in the last 2 hours, not 6 hours ago + self.assertGreater(dr.execution_date, two_hours_ago) # The DR should be scheduled BEFORE now self.assertLess(dr.execution_date, timezone.utcnow()) - # check @once schedule - dag4 = DAG(DAG_NAME4, - schedule_interval='@once', - max_active_runs=1, - catchup=False, - default_args=default_args - ) - - run_this_1 = DummyOperator(task_id='run_this_1', dag=dag4) - run_this_2 = DummyOperator(task_id='run_this_2', dag=dag4) - run_this_2.set_upstream(run_this_1) - run_this_3 = DummyOperator(task_id='run_this_3', dag=dag4) - run_this_3.set_upstream(run_this_2) - - session = settings.Session() - orm_dag = DagModel(dag_id=dag4.dag_id) - session.merge(orm_dag) - session.commit() - session.close() - - scheduler = SchedulerJob() - dag4.clear() - - dr = None + dag4 = setup_dag(dag_id='dag_without_catchup_once', + schedule_interval='@once', + start_date=six_hours_ago_to_the_hour, + catchup=False) dr = scheduler.create_dag_run(dag4) - - # We had better get a dag run self.assertIsNotNone(dr)
