uranusjr commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692719030



##########
File path: docs/apache-airflow/migrations-ref.rst
##########
@@ -23,6 +23,8 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | 
Description                                                                     
      |
 
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
+| ``7b2661a43ba3`` (head)        | ``142555e44c17`` |                 | Change 
TaskInstance and TaskReschedule tables from execution_date to run_id.          |
++--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | ``142555e44c17`` (head)        | ``54bebd308c5f`` |                 | Add 
``data_interval_start`` and ``data_interval_end`` to ``DagRun``                 
  |

Review comment:
       ```suggestion
   | ``142555e44c17``               | ``54bebd308c5f`` |                 | Add 
``data_interval_start`` and ``data_interval_end`` to ``DagRun``                 
  |
   ```

##########
File path: airflow/models/taskinstance.py
##########
@@ -1723,30 +1736,22 @@ def is_eligible_to_retry(self):
 
         return self.task.retries and self.try_number <= self.max_tries
 
-    @provide_session
-    def get_template_context(self, session=None) -> Context:
+    def get_template_context(self, session: Session = None) -> Context:
         """Return TI Context"""
+        # Do not use provide_session here -- it expunges everything on exit!
+        if not session:
+            session = settings.Session()
         task = self.task
         from airflow import macros
 
         integrate_macros_plugins()
 
-        dag_run = self.get_dagrun()
-
-        # FIXME: Many tests don't create a DagRun. We should fix the tests.
-        if dag_run is None:
-            FakeDagRun = namedtuple(
-                "FakeDagRun",
-                # A minimal set of attributes to keep things working.
-                "conf data_interval_start data_interval_end external_trigger 
run_id",
-            )
-            dag_run = FakeDagRun(
-                conf=None,
-                data_interval_start=None,
-                data_interval_end=None,
-                external_trigger=False,
-                run_id="",
-            )
+        params = {}  # type: Dict[str, Any]
+        # Ensure that the dag_run is loaded -- otherwise `self.execution_date` 
may not work
+        dag_run = self.get_dagrun(session)
+        if hasattr(task, 'dag'):
+            if task.dag.params:
+                params.update(task.dag.params)
 
         params = {}  # type: Dict[str, Any]

Review comment:
       The first `params` is shadowed. Accidental?

##########
File path: airflow/models/taskinstance.py
##########
@@ -389,27 +399,53 @@ class TaskInstance(Base, LoggingMixin):
         innerjoin=True,
     )
 
-    def __init__(self, task, execution_date: datetime, state: Optional[str] = 
None):
+    dag_run = relationship("DagRun", back_populates="task_instances")
+
+    execution_date = association_proxy("dag_run", "execution_date")
+
+    def __init__(
+        self, task, execution_date: Optional[datetime] = None, run_id: str = 
None, state: Optional[str] = None
+    ):
         super().__init__()
         self.dag_id = task.dag_id
         self.task_id = task.task_id
         self.refresh_from_task(task)
         self._log = logging.getLogger("airflow.task")
 
-        # make sure we have a localized execution_date stored in UTC
-        if execution_date and not timezone.is_localized(execution_date):
-            self.log.warning(
-                "execution date %s has no timezone information. Using default 
from dag or system",
-                execution_date,
+        if execution_date:
+            from airflow.models.dagrun import DagRun  # Avoid circular import
+
+            warnings.warn(
+                "Passing an execution_date to `TaskInstance()` is deprecated 
in favour of passing a run_id",
+                DeprecationWarning,
+                # Stack level is 4 because SQLA adds some wrappers around the 
constructor
+                stacklevel=4,
             )
-            if self.task.has_dag():
-                execution_date = timezone.make_aware(execution_date, 
self.task.dag.timezone)
-            else:
-                execution_date = timezone.make_aware(execution_date)
+            # make sure we have a localized execution_date stored in UTC
+            if execution_date and not timezone.is_localized(execution_date):
+                self.log.warning(
+                    "execution date %s has no timezone information. Using 
default from dag or system",
+                    execution_date,
+                )
+                if self.task.has_dag():
+                    execution_date = timezone.make_aware(execution_date, 
self.task.dag.timezone)
+                else:
+                    execution_date = timezone.make_aware(execution_date)
 
-            execution_date = timezone.convert_to_utc(execution_date)
+                execution_date = timezone.convert_to_utc(execution_date)
+            with create_session() as session:
+                try:
+                    (run_id,) = (
+                        session.query(DagRun.run_id)
+                        .filter_by(dag_id=self.dag_id, 
execution_date=execution_date)
+                        .one()
+                    )
+                except NoResultFound:

Review comment:
       ```suggestion
                   run_id = (
                       session.query(DagRun.run_id)
                       .filter_by(dag_id=self.dag_id, 
execution_date=execution_date)
                       .scalar()
                   )
                   if run_id is None:
   ```

##########
File path: airflow/ti_deps/deps/dagrun_exists_dep.py
##########
@@ -29,27 +29,9 @@ class DagrunRunningDep(BaseTIDep):
 
     @provide_session
     def _get_dep_statuses(self, ti, session, dep_context):
-        dag = ti.task.dag
-        dagrun = ti.get_dagrun(session)
-        if not dagrun:
-            # The import is needed here to avoid a circular dependency
-            from airflow.models.dagrun import DagRun
-
-            running_dagruns = DagRun.find(
-                dag_id=dag.dag_id, state=State.RUNNING, 
external_trigger=False, session=session
+        dr = ti.get_dagrun(session)
+        if dr.state != State.RUNNING:
+            yield self._failing_status(
+                reason="Task instance's dagrun was not in the 'running' state 
but in "
+                "the state '{}'.".format(dr.state)
             )
-
-            if len(running_dagruns) >= dag.max_active_runs:
-                reason = (
-                    "The maximum number of active dag runs ({}) for this task "
-                    "instance's DAG '{}' has been 
reached.".format(dag.max_active_runs, ti.dag_id)
-                )
-            else:
-                reason = "Unknown reason"
-            yield self._failing_status(reason=f"Task instance's dagrun did not 
exist: {reason}.")

Review comment:
       Why is it OK to drop this logic?

##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -60,8 +61,12 @@ class BigQueryConsoleLink(BaseOperatorLink):
     name = 'BigQuery Console'
 
     def get_link(self, operator, dttm):
-        ti = TaskInstance(task=operator, execution_date=dttm)
-        job_id = ti.xcom_pull(task_ids=operator.task_id, key='job_id')
+        job_id = XCom.get_one(
+            dag_ids=operator.dag.dag_id,
+            task_ids=operator.task_id,
+            execution_date=dttm,
+            key='job_id',
+        )

Review comment:
       Also #16370. Should we deprecate using `execution_date` in 
`XCom.get_one()` and `XCom.get_many()`? If we do, how do we keep backward 
compatibility here while avoid emitting the DeprecationWarning?

##########
File path: airflow/models/taskinstance.py
##########
@@ -1092,13 +1102,16 @@ def get_dagrun(self, session: Session = None):
         :param session: SQLAlchemy ORM Session
         :return: DagRun
         """
+        info = inspect(self)
+        if info.attrs.dag_run.loaded_value is not NO_VALUE:
+            return self.dag_run
+
         from airflow.models.dagrun import DagRun  # Avoid circular import
 
-        dr = (
-            session.query(DagRun)
-            .filter(DagRun.dag_id == self.dag_id, DagRun.execution_date == 
self.execution_date)
-            .first()
-        )
+        dr = session.query(DagRun).filter(DagRun.dag_id == self.dag_id, 
DagRun.run_id == self.run_id).one()
+
+        # Record it in the instance for next timme. This means that 
`self.execution_date` will work correctly
+        set_committed_value(self, 'dag_run', dr)

Review comment:
       ```suggestion
           # Record it in the instance for next time. This means that 
`self.execution_date` will work correctly
           set_committed_value(self, 'dag_run', dr)
   ```
   
   This also means `info.attrs.dag_run` will become loaded next time this is 
called, right?

##########
File path: airflow/ti_deps/dep_context.py
##########
@@ -85,23 +84,16 @@ def __init__(
         self.ignore_ti_state = ignore_ti_state
         self.finished_tasks = finished_tasks
 
-    def ensure_finished_tasks(self, dag, execution_date: pendulum.DateTime, 
session: Session):
+    def ensure_finished_tasks(self, dag_run, session: Session):

Review comment:
       ```suggestion
       def ensure_finished_tasks(self, dag_run: "DagRun", session: Session):
   ```
   
   (And an import of course.)

##########
File path: airflow/models/taskinstance.py
##########
@@ -2241,8 +2247,10 @@ def xcom_pull(
         if dag_id is None:
             dag_id = self.dag_id
 
+        execution_date = self.get_dagrun(session).execution_date
+
         query = XCom.get_many(
-            execution_date=self.execution_date,
+            execution_date=execution_date,

Review comment:
       Backref-ing #16370 so we remember to change this when XCom uses run_id.

##########
File path: tests/executors/test_celery_executor.py
##########
@@ -324,11 +322,11 @@ def test_try_adopt_task_instances(self):
             task_1 = BaseOperator(task_id="task_1", start_date=start_date)
             task_2 = BaseOperator(task_id="task_2", start_date=start_date)
 
-        ti1 = TaskInstance(task=task_1, execution_date=exec_date)
+        ti1 = TaskInstance(task=task_1, run_id=None)

Review comment:
       I feel we should create a real DagRun here that uses `exec_date` instead.

##########
File path: airflow/www/utils.py
##########
@@ -473,6 +474,12 @@ def is_extendedjson(self, col_name):
             )
         return False
 
+    def get_col_default(self, col_name: str) -> Any:
+        if col_name not in self.list_columns:
+            # Handle AssociationProxy etc, or anything that isn't a "real" 
column
+            return None
+        return super().get_col_default(col_name)

Review comment:
       I’m not sure what to think about this (not your code but the fact we 
need this block of code in the first place).

##########
File path: tests/executors/test_celery_executor.py
##########
@@ -363,8 +360,8 @@ def test_check_for_stalled_adopted_tasks(self):
             task_1 = BaseOperator(task_id="task_1", start_date=start_date)
             task_2 = BaseOperator(task_id="task_2", start_date=start_date)
 
-        key_1 = TaskInstanceKey(dag.dag_id, task_1.task_id, exec_date, 
try_number)
-        key_2 = TaskInstanceKey(dag.dag_id, task_2.task_id, exec_date, 
try_number)
+        key_1 = TaskInstanceKey(dag.dag_id, task_1.task_id, "runid", 
try_number)
+        key_2 = TaskInstanceKey(dag.dag_id, task_2.task_id, "runid", 
try_number)

Review comment:
       Less sure about this; is creating real DagRun and TaskInstance objects 
worthwhile?

##########
File path: tests/jobs/test_triggerer_job.py
##########
@@ -305,22 +303,14 @@ def test_invalid_trigger(session):
     session.commit()
 
     # Create the test DAG and task
-    with DAG(
-        dag_id='test_invalid_trigger',
-        start_date=timezone.datetime(2016, 1, 1),
-        schedule_interval='@once',
-        max_active_runs=1,
-    ):
-        task1 = DummyOperator(task_id='dummy1')
+    with dag_maker(dag_id='test_invalid_trigger'):
+        DummyOperator(task_id='dummy1')
 
+    dr = dag_maker.create_dagrun()

Review comment:
       Do we need to pass `session` in?

##########
File path: airflow/www/views.py
##########
@@ -1096,7 +1097,8 @@ def rendered_k8s(self):
         logging.info("Retrieving rendered templates.")
         dag = current_app.dag_bag.get_dag(dag_id)
         task = dag.get_task(task_id)
-        ti = models.TaskInstance(task=task, execution_date=dttm)
+        dag_run = dag.get_dagrun(execution_date=execution_date)
+        ti = dag_run.get_task_instance(task_id=task.task_id)

Review comment:
       At some point we should change these views to use `run_id` right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to