stale[bot] closed pull request #3489: [AIRFLOW-2594][WIP] Remove implicit 
commits
URL: https://github.com/apache/incubator-airflow/pull/3489
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/api/common/experimental/mark_tasks.py 
b/airflow/api/common/experimental/mark_tasks.py
index e9e4fec223..101cf8cf53 100644
--- a/airflow/api/common/experimental/mark_tasks.py
+++ b/airflow/api/common/experimental/mark_tasks.py
@@ -81,7 +81,7 @@ def set_state(task, execution_date, upstream=False, 
downstream=False,
     assert task.dag is not None
     dag = task.dag
 
-    latest_execution_date = dag.latest_execution_date
+    latest_execution_date = dag.latest_execution_date()
     assert latest_execution_date is not None
 
     # determine date range of dag runs and tasks to consider
diff --git a/airflow/jobs.py b/airflow/jobs.py
index ad114abda3..55b8ebec33 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1402,7 +1402,7 @@ def _process_dags(self, dagbag, dags, tis_out):
         """
         for dag in dags:
             dag = dagbag.get_dag(dag.dag_id)
-            if dag.is_paused:
+            if dag.is_paused():
                 self.log.info("Not processing DAG %s since it's paused", 
dag.dag_id)
                 continue
 
@@ -1795,7 +1795,7 @@ def process_file(self, file_path, pickle_dags=False, 
session=None):
             dag.sync_to_db()
 
         paused_dag_ids = [dag.dag_id for dag in dagbag.dags.values()
-                          if dag.is_paused]
+                          if dag.is_paused()]
 
         # Pickle the DAGs (if necessary) and put them into a SimpleDag
         for dag_id in dagbag.dags:
@@ -2391,7 +2391,7 @@ def _process_backfill_task_instances(self,
                     ti_status.active_runs.remove(run)
                     executed_run_dates.append(run.execution_date)
 
-                if run.dag.is_paused:
+                if run.dag.is_paused():
                     models.DagStat.update([run.dag_id], session=session)
 
             self._log_progress(ti_status)
diff --git a/airflow/models.py b/airflow/models.py
index 704dc808cf..767ff1db31 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1241,7 +1241,6 @@ def are_dependents_done(self, session=None):
         count = ti[0][0]
         return count == len(task.downstream_task_ids)
 
-    @property
     @provide_session
     def previous_ti(self, session=None):
         """ The task instance for the task that ran before this task instance 
"""
@@ -3454,7 +3453,6 @@ def folder(self):
     def owner(self):
         return ", ".join(list(set([t.owner for t in self.tasks])))
 
-    @property
     @provide_session
     def concurrency_reached(self, session=None):
         """
@@ -3468,7 +3466,6 @@ def concurrency_reached(self, session=None):
         )
         return qry.scalar() >= self.concurrency
 
-    @property
     @provide_session
     def is_paused(self, session=None):
         """
@@ -3557,7 +3554,6 @@ def get_dagrun(self, execution_date, session=None):
 
         return dagrun
 
-    @property
     @provide_session
     def latest_execution_date(self, session=None):
         """
diff --git a/airflow/ti_deps/deps/dag_ti_slots_available_dep.py 
b/airflow/ti_deps/deps/dag_ti_slots_available_dep.py
index c3245ebe53..7fc3561c39 100644
--- a/airflow/ti_deps/deps/dag_ti_slots_available_dep.py
+++ b/airflow/ti_deps/deps/dag_ti_slots_available_dep.py
@@ -26,7 +26,7 @@ class DagTISlotsAvailableDep(BaseTIDep):
 
     @provide_session
     def _get_dep_statuses(self, ti, session, dep_context):
-        if ti.task.dag.concurrency_reached:
+        if ti.task.dag.concurrency_reached(session):
             yield self._failing_status(
                 reason="The maximum number of running tasks ({0}) for this 
task's DAG "
                        "'{1}' has been 
reached.".format(ti.task.dag.concurrency,
diff --git a/airflow/ti_deps/deps/dag_unpaused_dep.py 
b/airflow/ti_deps/deps/dag_unpaused_dep.py
index adf150fb07..750a3b3bc2 100644
--- a/airflow/ti_deps/deps/dag_unpaused_dep.py
+++ b/airflow/ti_deps/deps/dag_unpaused_dep.py
@@ -26,6 +26,6 @@ class DagUnpausedDep(BaseTIDep):
 
     @provide_session
     def _get_dep_statuses(self, ti, session, dep_context):
-        if ti.task.dag.is_paused:
+        if ti.task.dag.is_paused(session):
             yield self._failing_status(
                 reason="Task's DAG '{0}' is paused.".format(ti.dag_id))
diff --git a/airflow/ti_deps/deps/prev_dagrun_dep.py 
b/airflow/ti_deps/deps/prev_dagrun_dep.py
index 2dd311f3aa..fd886db553 100644
--- a/airflow/ti_deps/deps/prev_dagrun_dep.py
+++ b/airflow/ti_deps/deps/prev_dagrun_dep.py
@@ -56,15 +56,15 @@ def _get_dep_statuses(self, ti, session, dep_context):
                     reason="This task instance was the first task instance for 
its task.")
                 return
         else:
-            dr = ti.get_dagrun()
-            last_dagrun = dr.get_previous_dagrun() if dr else None
+            dr = ti.get_dagrun(session=session)
+            last_dagrun = dr.get_previous_dagrun(session=session) if dr else 
None
 
             if not last_dagrun:
                 yield self._passing_status(
                     reason="This task instance was the first task instance for 
its task.")
                 return
 
-        previous_ti = ti.previous_ti
+        previous_ti = ti.previous_ti(session=session)
         if not previous_ti:
             yield self._failing_status(
                 reason="depends_on_past is true for this task's DAG, but the 
previous "
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 543eb41692..ce917f21d4 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -51,7 +51,7 @@ def __init__(self, dag, pickle_id=None):
         self._dag_id = dag.dag_id
         self._task_ids = [task.task_id for task in dag.tasks]
         self._full_filepath = dag.full_filepath
-        self._is_paused = dag.is_paused
+        self._is_paused = dag.is_paused()
         self._concurrency = dag.concurrency
         self._pickle_id = pickle_id
         self._task_special_args = {}
diff --git a/airflow/www/templates/airflow/dag.html 
b/airflow/www/templates/airflow/dag.html
index adb2d387da..5a5509e440 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -32,7 +32,7 @@ <h3 class="pull-left">
       {% if dag.parent_dag %}
         <span style='color:#AAA;'>SUBDAG: </span> <span> {{ dag.dag_id 
}}</span>
       {% else %}
-        <input id="pause_resume" dag_id="{{ dag.dag_id }}" type="checkbox" {{ 
"checked" if not dag.is_paused else "" }} data-toggle="toggle" data-size="mini" 
method="post">
+        <input id="pause_resume" dag_id="{{ dag.dag_id }}" type="checkbox" {{ 
"checked" if not dag.is_paused() else "" }} data-toggle="toggle" 
data-size="mini" method="post">
         <span style='color:#AAA;'>DAG: </span> <span> {{ dag.dag_id }}</span> 
<small class="text-muted"> {{ dag.description }} </small>
       {% endif %}
       {% if root %}
diff --git a/airflow/www/views.py b/airflow/www/views.py
index a16b685ed7..5635fa3a91 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -324,7 +324,7 @@ def get_date_time_num_runs_dag_runs_form_data(request, 
session, dag):
     if dttm:
         dttm = pendulum.parse(dttm)
     else:
-        dttm = dag.latest_execution_date or timezone.utcnow()
+        dttm = dag.latest_execution_date(session=session) or timezone.utcnow()
 
     base_date = request.args.get('base_date')
     if base_date:
@@ -1296,7 +1296,7 @@ def tree(self, session=None):
         if base_date:
             base_date = timezone.parse(base_date)
         else:
-            base_date = dag.latest_execution_date or timezone.utcnow()
+            base_date = dag.latest_execution_date(session=session) or 
timezone.utcnow()
 
         dates = dag.date_range(base_date, num=-abs(num_runs))
         min_date = dates[0] if dates else datetime(2000, 1, 1)
@@ -1514,7 +1514,7 @@ def duration(self, session=None):
         if base_date:
             base_date = pendulum.parse(base_date)
         else:
-            base_date = dag.latest_execution_date or timezone.utcnow()
+            base_date = dag.latest_execution_date(session=session) or 
timezone.utcnow()
 
         dates = dag.date_range(base_date, num=-abs(num_runs))
         min_date = dates[0] if dates else datetime(2000, 1, 1)
@@ -1622,7 +1622,7 @@ def tries(self, session=None):
         if base_date:
             base_date = pendulum.parse(base_date)
         else:
-            base_date = dag.latest_execution_date or timezone.utcnow()
+            base_date = dag.latest_execution_date(session=session) or 
timezone.utcnow()
 
         dates = dag.date_range(base_date, num=-abs(num_runs))
         min_date = dates[0] if dates else datetime(2000, 1, 1)
@@ -1686,7 +1686,7 @@ def landing_times(self, session=None):
         if base_date:
             base_date = pendulum.parse(base_date)
         else:
-            base_date = dag.latest_execution_date or timezone.utcnow()
+            base_date = dag.latest_execution_date(session=session) or 
timezone.utcnow()
 
         dates = dag.date_range(base_date, num=-abs(num_runs))
         min_date = dates[0] if dates else datetime(2000, 1, 1)
diff --git a/airflow/www_rbac/templates/airflow/dag.html 
b/airflow/www_rbac/templates/airflow/dag.html
index eb44908502..35e4e20250 100644
--- a/airflow/www_rbac/templates/airflow/dag.html
+++ b/airflow/www_rbac/templates/airflow/dag.html
@@ -31,7 +31,7 @@ <h3 class="pull-left">
       {% if dag.parent_dag %}
         <span style='color:#AAA;'>SUBDAG: </span> <span> {{ dag.dag_id 
}}</span>
       {% else %}
-        <input id="pause_resume" dag_id="{{ dag.dag_id }}" type="checkbox" {{ 
"checked" if not dag.is_paused else "" }} data-toggle="toggle" data-size="mini" 
method="post">
+        <input id="pause_resume" dag_id="{{ dag.dag_id }}" type="checkbox" {{ 
"checked" if not dag.is_paused() else "" }} data-toggle="toggle" 
data-size="mini" method="post">
         <span style='color:#AAA;'>DAG: </span> <span> {{ dag.dag_id }}</span> 
<small class="text-muted"> {{ dag.description }} </small>
       {% endif %}
       {% if root %}
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index 3a1951a2b4..86c69cf28d 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -86,7 +86,7 @@ def get_date_time_num_runs_dag_runs_form_data(request, 
session, dag):
     if dttm:
         dttm = pendulum.parse(dttm)
     else:
-        dttm = dag.latest_execution_date or timezone.utcnow()
+        dttm = dag.latest_execution_date(session=session) or timezone.utcnow()
 
     base_date = request.args.get('base_date')
     if base_date:
@@ -201,7 +201,7 @@ def get_int_arg(value, default=0):
         # optionally filter out "paused" dags
         if hide_paused:
             unfiltered_webserver_dags = [dag for dag in dagbag.dags.values() if
-                                         not dag.parent_dag and not 
dag.is_paused]
+                                         not dag.parent_dag and not 
dag.is_paused()]
 
         else:
             unfiltered_webserver_dags = [dag for dag in dagbag.dags.values() if
@@ -974,7 +974,7 @@ def tree(self, session=None):
         if base_date:
             base_date = timezone.parse(base_date)
         else:
-            base_date = dag.latest_execution_date or timezone.utcnow()
+            base_date = dag.latest_execution_date(session=session) or 
timezone.utcnow()
 
         dates = dag.date_range(base_date, num=-abs(num_runs))
         min_date = dates[0] if dates else timezone.utc_epoch()
@@ -1192,7 +1192,7 @@ def duration(self, session=None):
         if base_date:
             base_date = pendulum.parse(base_date)
         else:
-            base_date = dag.latest_execution_date or timezone.utcnow()
+            base_date = dag.latest_execution_date(session=session) or 
timezone.utcnow()
 
         dates = dag.date_range(base_date, num=-abs(num_runs))
         min_date = dates[0] if dates else timezone.utc_epoch()
@@ -1299,7 +1299,7 @@ def tries(self, session=None):
         if base_date:
             base_date = pendulum.parse(base_date)
         else:
-            base_date = dag.latest_execution_date or timezone.utcnow()
+            base_date = dag.latest_execution_date(session=session) or 
timezone.utcnow()
 
         dates = dag.date_range(base_date, num=-abs(num_runs))
         min_date = dates[0] if dates else timezone.utc_epoch()
@@ -1363,7 +1363,7 @@ def landing_times(self, session=None):
         if base_date:
             base_date = pendulum.parse(base_date)
         else:
-            base_date = dag.latest_execution_date or timezone.utcnow()
+            base_date = dag.latest_execution_date(session=session) or 
timezone.utcnow()
 
         dates = dag.date_range(base_date, num=-abs(num_runs))
         min_date = dates[0] if dates else timezone.utc_epoch()
diff --git a/tests/core.py b/tests/core.py
index c3adb3c500..3dc5ca5a10 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1299,12 +1299,12 @@ def test_pause(self):
         args = self.parser.parse_args([
             'pause', 'example_bash_operator'])
         cli.pause(args)
-        self.assertIn(self.dagbag.dags['example_bash_operator'].is_paused, 
[True, 1])
+        self.assertIn(self.dagbag.dags['example_bash_operator'].is_paused(), 
[True, 1])
 
         args = self.parser.parse_args([
             'unpause', 'example_bash_operator'])
         cli.unpause(args)
-        self.assertIn(self.dagbag.dags['example_bash_operator'].is_paused, 
[False, 0])
+        self.assertIn(self.dagbag.dags['example_bash_operator'].is_paused(), 
[False, 0])
 
     def test_subdag_clear(self):
         args = self.parser.parse_args([
diff --git a/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py 
b/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py
index 0ea1503a21..bf7c5e882e 100644
--- a/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py
+++ b/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py
@@ -30,7 +30,7 @@ def test_concurrency_reached(self):
         """
         Test concurrency reached should fail dep
         """
-        dag = Mock(concurrency=1, concurrency_reached=True)
+        dag = Mock(concurrency=1, concurrency_reached=lambda x: True)
         task = Mock(dag=dag)
         ti = TaskInstance(task, execution_date=None)
 
@@ -40,7 +40,7 @@ def test_all_conditions_met(self):
         """
         Test all conditions met should pass dep
         """
-        dag = Mock(concurrency=1, concurrency_reached=False)
+        dag = Mock(concurrency=1, concurrency_reached=lambda x: False)
         task = Mock(dag=dag)
         ti = TaskInstance(task, execution_date=None)
 
diff --git a/tests/ti_deps/deps/test_dag_unpaused_dep.py 
b/tests/ti_deps/deps/test_dag_unpaused_dep.py
index c1f4148d29..87f7c1fa24 100644
--- a/tests/ti_deps/deps/test_dag_unpaused_dep.py
+++ b/tests/ti_deps/deps/test_dag_unpaused_dep.py
@@ -30,7 +30,7 @@ def test_concurrency_reached(self):
         """
         Test paused DAG should fail dependency
         """
-        dag = Mock(is_paused=True)
+        dag = Mock(is_paused=lambda x: True)
         task = Mock(dag=dag)
         ti = TaskInstance(task=task, execution_date=None)
 
@@ -40,7 +40,7 @@ def test_all_conditions_met(self):
         """
         Test all conditions met should pass dep
         """
-        dag = Mock(is_paused=False)
+        dag = Mock(is_paused=lambda x: False)
         task = Mock(dag=dag)
         ti = TaskInstance(task=task, execution_date=None)
 
diff --git a/tests/ti_deps/deps/test_prev_dagrun_dep.py 
b/tests/ti_deps/deps/test_prev_dagrun_dep.py
index a51dcd091b..4576634ddf 100644
--- a/tests/ti_deps/deps/test_prev_dagrun_dep.py
+++ b/tests/ti_deps/deps/test_prev_dagrun_dep.py
@@ -121,8 +121,8 @@ def test_all_met(self):
                               wait_for_downstream=True)
         prev_ti = Mock(state=State.SUCCESS,
                        are_dependents_done=Mock(return_value=True))
-        ti = Mock(task=task, previous_ti=prev_ti,
-                  execution_date=datetime(2016, 1, 2))
+        ti = Mock(task=task, execution_date=datetime(2016, 1, 2))
         dep_context = DepContext(ignore_depends_on_past=False)
+        ti.previous_ti.return_value = prev_ti
 
         self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to