Repository: incubator-airflow
Updated Branches:
  refs/heads/master c414670a0 -> 4ce250295 (forced update)


[AIRFLOW-2474] Only import snakebite if using py2

Closes #3365 from jgao54/snakebite-import


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4ce25029
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4ce25029
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4ce25029

Branch: refs/heads/master
Commit: 4ce25029524e10770c9047ec04d3ab9c6e257cf4
Parents: b17a99d
Author: Joy Gao <joy...@apache.org>
Authored: Thu May 17 10:05:52 2018 +0100
Committer: Kaxil Naik <kaxiln...@gmail.com>
Committed: Thu May 17 11:37:03 2018 +0100

----------------------------------------------------------------------
 airflow/hooks/hdfs_hook.py |   8 +--
 tests/core.py              |   5 +-
 tests/jobs.py              | 154 +++++++++++++---------------------------
 3 files changed, 56 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ce25029/airflow/hooks/hdfs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hdfs_hook.py b/airflow/hooks/hdfs_hook.py
index c6e68c8..a87fa94 100644
--- a/airflow/hooks/hdfs_hook.py
+++ b/airflow/hooks/hdfs_hook.py
@@ -17,14 +17,14 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from six import PY2
 from airflow.hooks.base_hook import BaseHook
 from airflow import configuration
 
-try:
-    snakebite_imported = True
+snakebite_imported = False
+if PY2:
     from snakebite.client import Client, HAClient, Namenode, AutoConfigClient
-except ImportError:
-    snakebite_imported = False
+    snakebite_imported = True
 
 from airflow.exceptions import AirflowException
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ce25029/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index ddcec42..83737ed 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -2313,11 +2313,10 @@ class WebHDFSHookTest(unittest.TestCase):
         self.assertEqual('someone', c.proxy_user)
 
 
-try:
+HDFSHook = None
+if six.PY2:
     from airflow.hooks.hdfs_hook import HDFSHook
     import snakebite
-except ImportError:
-    HDFSHook = None
 
 
 @unittest.skipIf(HDFSHook is None,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4ce25029/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 9e07645..09ed5b8 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -2750,131 +2750,77 @@ class SchedulerJobTest(unittest.TestCase):
         Test to check that a DAG with catchup = False only schedules beginning 
now, not back to the start date
         """
 
-        now = timezone.utcnow()
-        six_hours_ago_to_the_hour = (now - 
datetime.timedelta(hours=6)).replace(minute=0, second=0, microsecond=0)
-        three_minutes_ago = now - datetime.timedelta(minutes=3)
-        two_hours_and_three_minutes_ago = three_minutes_ago - 
datetime.timedelta(hours=2)
+        def setup_dag(dag_id, schedule_interval, start_date, catchup):
+            default_args = {
+                'owner': 'airflow',
+                'depends_on_past': False,
+                'start_date': start_date
+            }
+            dag = DAG(dag_id,
+                      schedule_interval=schedule_interval,
+                      max_active_runs=1,
+                      catchup=catchup,
+                      default_args=default_args)
+
+            t1 = DummyOperator(task_id='t1', dag=dag)
+            t2 = DummyOperator(task_id='t2', dag=dag)
+            t2.set_upstream(t1)
+            t3 = DummyOperator(task_id='t3', dag=dag)
+            t3.set_upstream(t2)
+
+            session = settings.Session()
+            orm_dag = DagModel(dag_id=dag.dag_id)
+            session.merge(orm_dag)
+            session.commit()
+            session.close()
 
-        START_DATE = six_hours_ago_to_the_hour
-        DAG_NAME1 = 'no_catchup_test1'
-        DAG_NAME2 = 'no_catchup_test2'
-        DAG_NAME3 = 'no_catchup_test3'
-        DAG_NAME4 = 'no_catchup_test4'
+            return dag
 
-        default_args = {
-            'owner': 'airflow',
-            'depends_on_past': False,
-            'start_date': START_DATE
+        now = timezone.utcnow()
+        six_hours_ago_to_the_hour = (now - 
datetime.timedelta(hours=6)).replace(
+            minute=0, second=0, microsecond=0)
+        half_an_hour_ago = now - datetime.timedelta(minutes=30)
+        two_hours_ago = now - datetime.timedelta(hours=2)
 
-        }
-        dag1 = DAG(DAG_NAME1,
-                  schedule_interval='* * * * *',
-                  max_active_runs=1,
-                  default_args=default_args
-                  )
+        scheduler = SchedulerJob()
 
+        dag1 = setup_dag(dag_id='dag_with_catchup',
+                         schedule_interval='* * * * *',
+                         start_date=six_hours_ago_to_the_hour,
+                         catchup=True)
         default_catchup = configuration.conf.getboolean('scheduler', 
'catchup_by_default')
-        # Test configs have catchup by default ON
-
         self.assertEqual(default_catchup, True)
-
-        # Correct default?
         self.assertEqual(dag1.catchup, True)
 
-        dag2 = DAG(DAG_NAME2,
-                  schedule_interval='* * * * *',
-                  max_active_runs=1,
-                  catchup=False,
-                  default_args=default_args
-                  )
-
-        run_this_1 = DummyOperator(task_id='run_this_1', dag=dag2)
-        run_this_2 = DummyOperator(task_id='run_this_2', dag=dag2)
-        run_this_2.set_upstream(run_this_1)
-        run_this_3 = DummyOperator(task_id='run_this_3', dag=dag2)
-        run_this_3.set_upstream(run_this_2)
-
-        session = settings.Session()
-        orm_dag = DagModel(dag_id=dag2.dag_id)
-        session.merge(orm_dag)
-        session.commit()
-        session.close()
-
-        scheduler = SchedulerJob()
-        dag2.clear()
-
+        dag2 = setup_dag(dag_id='dag_without_catchup_ten_minute',
+                         schedule_interval='*/10 * * * *',
+                         start_date=six_hours_ago_to_the_hour,
+                         catchup=False)
         dr = scheduler.create_dag_run(dag2)
-
         # We had better get a dag run
         self.assertIsNotNone(dr)
-
-        # The DR should be scheduled in the last 3 minutes, not 6 hours ago
-        self.assertGreater(dr.execution_date, three_minutes_ago)
-
+        # The DR should be scheduled in the last half an hour, not 6 hours ago
+        self.assertGreater(dr.execution_date, half_an_hour_ago)
         # The DR should be scheduled BEFORE now
         self.assertLess(dr.execution_date, timezone.utcnow())
 
-        dag3 = DAG(DAG_NAME3,
-                   schedule_interval='@hourly',
-                   max_active_runs=1,
-                   catchup=False,
-                   default_args=default_args
-                   )
-
-        run_this_1 = DummyOperator(task_id='run_this_1', dag=dag3)
-        run_this_2 = DummyOperator(task_id='run_this_2', dag=dag3)
-        run_this_2.set_upstream(run_this_1)
-        run_this_3 = DummyOperator(task_id='run_this_3', dag=dag3)
-        run_this_3.set_upstream(run_this_2)
-
-        session = settings.Session()
-        orm_dag = DagModel(dag_id=dag3.dag_id)
-        session.merge(orm_dag)
-        session.commit()
-        session.close()
-
-        scheduler = SchedulerJob()
-        dag3.clear()
-
-        dr = None
+        dag3 = setup_dag(dag_id='dag_without_catchup_hourly',
+                         schedule_interval='@hourly',
+                         start_date=six_hours_ago_to_the_hour,
+                         catchup=False)
         dr = scheduler.create_dag_run(dag3)
-
         # We had better get a dag run
         self.assertIsNotNone(dr)
-
-        # The DR should be scheduled in the last two hours, not 6 hours ago
-        self.assertGreater(dr.execution_date, two_hours_and_three_minutes_ago)
-
+        # The DR should be scheduled in the last 2 hours, not 6 hours ago
+        self.assertGreater(dr.execution_date, two_hours_ago)
         # The DR should be scheduled BEFORE now
         self.assertLess(dr.execution_date, timezone.utcnow())
 
-        # check @once schedule
-        dag4 = DAG(DAG_NAME4,
-                   schedule_interval='@once',
-                   max_active_runs=1,
-                   catchup=False,
-                   default_args=default_args
-                   )
-
-        run_this_1 = DummyOperator(task_id='run_this_1', dag=dag4)
-        run_this_2 = DummyOperator(task_id='run_this_2', dag=dag4)
-        run_this_2.set_upstream(run_this_1)
-        run_this_3 = DummyOperator(task_id='run_this_3', dag=dag4)
-        run_this_3.set_upstream(run_this_2)
-
-        session = settings.Session()
-        orm_dag = DagModel(dag_id=dag4.dag_id)
-        session.merge(orm_dag)
-        session.commit()
-        session.close()
-
-        scheduler = SchedulerJob()
-        dag4.clear()
-
-        dr = None
+        dag4 = setup_dag(dag_id='dag_without_catchup_once',
+                         schedule_interval='@once',
+                         start_date=six_hours_ago_to_the_hour,
+                         catchup=False)
         dr = scheduler.create_dag_run(dag4)
-
-        # We had better get a dag run
         self.assertIsNotNone(dr)
 
 

Reply via email to