ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692775501



##########
File path: docs/apache-airflow/migrations-ref.rst
##########
@@ -23,6 +23,8 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | 
Description                                                                     
      |
 
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
+| ``7b2661a43ba3`` (head)        | ``142555e44c17`` |                 | Change 
TaskInstance and TaskReschedule tables from execution_date to run_id.          |
++--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | ``142555e44c17`` (head)        | ``54bebd308c5f`` |                 | Add 
``data_interval_start`` and ``data_interval_end`` to ``DagRun``                 
  |

Review comment:
       2a7f8a411

##########
File path: airflow/models/taskinstance.py
##########
@@ -389,27 +399,53 @@ class TaskInstance(Base, LoggingMixin):
         innerjoin=True,
     )
 
-    def __init__(self, task, execution_date: datetime, state: Optional[str] = 
None):
+    dag_run = relationship("DagRun", back_populates="task_instances")
+
+    execution_date = association_proxy("dag_run", "execution_date")
+
+    def __init__(
+        self, task, execution_date: Optional[datetime] = None, run_id: str = 
None, state: Optional[str] = None
+    ):
         super().__init__()
         self.dag_id = task.dag_id
         self.task_id = task.task_id
         self.refresh_from_task(task)
         self._log = logging.getLogger("airflow.task")
 
-        # make sure we have a localized execution_date stored in UTC
-        if execution_date and not timezone.is_localized(execution_date):
-            self.log.warning(
-                "execution date %s has no timezone information. Using default 
from dag or system",
-                execution_date,
+        if execution_date:
+            from airflow.models.dagrun import DagRun  # Avoid circular import
+
+            warnings.warn(
+                "Passing an execution_date to `TaskInstance()` is deprecated 
in favour of passing a run_id",
+                DeprecationWarning,
+                # Stack level is 4 because SQLA adds some wrappers around the 
constructor
+                stacklevel=4,
             )
-            if self.task.has_dag():
-                execution_date = timezone.make_aware(execution_date, 
self.task.dag.timezone)
-            else:
-                execution_date = timezone.make_aware(execution_date)
+            # make sure we have a localized execution_date stored in UTC
+            if execution_date and not timezone.is_localized(execution_date):
+                self.log.warning(
+                    "execution date %s has no timezone information. Using 
default from dag or system",
+                    execution_date,
+                )
+                if self.task.has_dag():
+                    execution_date = timezone.make_aware(execution_date, 
self.task.dag.timezone)
+                else:
+                    execution_date = timezone.make_aware(execution_date)
 
-            execution_date = timezone.convert_to_utc(execution_date)
+                execution_date = timezone.convert_to_utc(execution_date)
+            with create_session() as session:
+                try:
+                    (run_id,) = (
+                        session.query(DagRun.run_id)
+                        .filter_by(dag_id=self.dag_id, 
execution_date=execution_date)
+                        .one()
+                    )
+                except NoResultFound:

Review comment:
       2a7f8a411

##########
File path: airflow/models/taskinstance.py
##########
@@ -1723,30 +1736,22 @@ def is_eligible_to_retry(self):
 
         return self.task.retries and self.try_number <= self.max_tries
 
-    @provide_session
-    def get_template_context(self, session=None) -> Context:
+    def get_template_context(self, session: Session = None) -> Context:
         """Return TI Context"""
+        # Do not use provide_session here -- it expunges everything on exit!
+        if not session:
+            session = settings.Session()
         task = self.task
         from airflow import macros
 
         integrate_macros_plugins()
 
-        dag_run = self.get_dagrun()
-
-        # FIXME: Many tests don't create a DagRun. We should fix the tests.
-        if dag_run is None:
-            FakeDagRun = namedtuple(
-                "FakeDagRun",
-                # A minimal set of attributes to keep things working.
-                "conf data_interval_start data_interval_end external_trigger 
run_id",
-            )
-            dag_run = FakeDagRun(
-                conf=None,
-                data_interval_start=None,
-                data_interval_end=None,
-                external_trigger=False,
-                run_id="",
-            )
+        params = {}  # type: Dict[str, Any]
+        # Ensure that the dag_run is loaded -- otherwise `self.execution_date` 
may not work
+        dag_run = self.get_dagrun(session)
+        if hasattr(task, 'dag'):
+            if task.dag.params:
+                params.update(task.dag.params)
 
         params = {}  # type: Dict[str, Any]

Review comment:
       2a7f8a411




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to