ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692767456
##########
File path: airflow/models/taskinstance.py
##########
@@ -389,27 +399,53 @@ class TaskInstance(Base, LoggingMixin):
innerjoin=True,
)
- def __init__(self, task, execution_date: datetime, state: Optional[str] =
None):
+ dag_run = relationship("DagRun", back_populates="task_instances")
+
+ execution_date = association_proxy("dag_run", "execution_date")
+
+ def __init__(
+ self, task, execution_date: Optional[datetime] = None, run_id: str =
None, state: Optional[str] = None
+ ):
super().__init__()
self.dag_id = task.dag_id
self.task_id = task.task_id
self.refresh_from_task(task)
self._log = logging.getLogger("airflow.task")
- # make sure we have a localized execution_date stored in UTC
- if execution_date and not timezone.is_localized(execution_date):
- self.log.warning(
- "execution date %s has no timezone information. Using default
from dag or system",
- execution_date,
+ if execution_date:
+ from airflow.models.dagrun import DagRun # Avoid circular import
+
+ warnings.warn(
+ "Passing an execution_date to `TaskInstance()` is deprecated
in favour of passing a run_id",
+ DeprecationWarning,
+ # Stack level is 4 because SQLA adds some wrappers around the
constructor
+ stacklevel=4,
)
- if self.task.has_dag():
- execution_date = timezone.make_aware(execution_date,
self.task.dag.timezone)
- else:
- execution_date = timezone.make_aware(execution_date)
+ # make sure we have a localized execution_date stored in UTC
+ if execution_date and not timezone.is_localized(execution_date):
+ self.log.warning(
+ "execution date %s has no timezone information. Using
default from dag or system",
+ execution_date,
+ )
+ if self.task.has_dag():
+ execution_date = timezone.make_aware(execution_date,
self.task.dag.timezone)
+ else:
+ execution_date = timezone.make_aware(execution_date)
- execution_date = timezone.convert_to_utc(execution_date)
+ execution_date = timezone.convert_to_utc(execution_date)
+ with create_session() as session:
+ try:
+ (run_id,) = (
+ session.query(DagRun.run_id)
+ .filter_by(dag_id=self.dag_id,
execution_date=execution_date)
+ .one()
+ )
+ except NoResultFound:
Review comment:
Oh yes that's nicer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]