ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692767456



##########
File path: airflow/models/taskinstance.py
##########
@@ -389,27 +399,53 @@ class TaskInstance(Base, LoggingMixin):
         innerjoin=True,
     )
 
-    def __init__(self, task, execution_date: datetime, state: Optional[str] = 
None):
+    dag_run = relationship("DagRun", back_populates="task_instances")
+
+    execution_date = association_proxy("dag_run", "execution_date")
+
+    def __init__(
+        self, task, execution_date: Optional[datetime] = None, run_id: str = 
None, state: Optional[str] = None
+    ):
         super().__init__()
         self.dag_id = task.dag_id
         self.task_id = task.task_id
         self.refresh_from_task(task)
         self._log = logging.getLogger("airflow.task")
 
-        # make sure we have a localized execution_date stored in UTC
-        if execution_date and not timezone.is_localized(execution_date):
-            self.log.warning(
-                "execution date %s has no timezone information. Using default 
from dag or system",
-                execution_date,
+        if execution_date:
+            from airflow.models.dagrun import DagRun  # Avoid circular import
+
+            warnings.warn(
+                "Passing an execution_date to `TaskInstance()` is deprecated 
in favour of passing a run_id",
+                DeprecationWarning,
+                # Stack level is 4 because SQLA adds some wrappers around the 
constructor
+                stacklevel=4,
             )
-            if self.task.has_dag():
-                execution_date = timezone.make_aware(execution_date, 
self.task.dag.timezone)
-            else:
-                execution_date = timezone.make_aware(execution_date)
+            # make sure we have a localized execution_date stored in UTC
+            if execution_date and not timezone.is_localized(execution_date):
+                self.log.warning(
+                    "execution date %s has no timezone information. Using 
default from dag or system",
+                    execution_date,
+                )
+                if self.task.has_dag():
+                    execution_date = timezone.make_aware(execution_date, 
self.task.dag.timezone)
+                else:
+                    execution_date = timezone.make_aware(execution_date)
 
-            execution_date = timezone.convert_to_utc(execution_date)
+                execution_date = timezone.convert_to_utc(execution_date)
+            with create_session() as session:
+                try:
+                    (run_id,) = (
+                        session.query(DagRun.run_id)
+                        .filter_by(dag_id=self.dag_id, 
execution_date=execution_date)
+                        .one()
+                    )
+                except NoResultFound:

Review comment:
       Oh yes that's nicer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to