jason810496 commented on code in PR #62259:
URL: https://github.com/apache/airflow/pull/62259#discussion_r2894785172
##########
providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py:
##########
@@ -291,16 +316,23 @@ def _trigger_dag_af_3(self, context, run_id,
parsed_logical_date):
deferrable=self.deferrable,
)
- if self.note and "note" in
inspect.signature(DagRunTriggerException.__init__).parameters:
+ parameters =
inspect.signature(DagRunTriggerException.__init__).parameters
+ if self.note and "note" in parameters:
kwargs_accepted["note"] = self.note
+ if parsed_run_after and "run_after" in parameters:
+ kwargs_accepted["run_after"] = parsed_run_after
+
raise DagRunTriggerException(**kwargs_accepted)
def _trigger_dag_af_2(self, context, run_id, parsed_logical_date):
try:
if self.note:
self.log.warning("Parameter 'note' is not supported in Airflow
2.x and will be ignored.")
+ if self.run_after is not NOTSET:
Review Comment:
Correct! I just double check on whether we support `run_after` in AF2.
The `DagRun.run_after` was introduced in `3.0.0`
https://github.com/apache/airflow/blob/243fe86d4b3e59bb12977b3e36ca3f2ed27ca0f8/airflow-core/src/airflow/migrations/versions/0058_3_0_0_add_dagrun_run_after.py#L18-L62
and the AF2 REST API didn't support `run_after`, but it support
`execution_date` (the `logical_date` in AF3).
https://github.com/apache/airflow/blob/243fe86d4b3e59bb12977b3e36ca3f2ed27ca0f8/airflow-core/src/airflow/migrations/versions/0058_3_0_0_add_dagrun_run_after.py#L18-L62
##########
providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py:
##########
@@ -215,18 +217,38 @@ def __init__(
f"Expected str, datetime.datetime, or None for parameter
'logical_date'. Got {type(logical_date).__name__}"
)
+ self.run_after = run_after
+ if run_after is NOTSET:
+ self.run_after = NOTSET
+ elif run_after is None or isinstance(run_after, (str,
datetime.datetime)):
+ self.run_after = run_after
+ else:
+ raise TypeError(
+ f"Expected str, datetime.datetime, or None for parameter
'run_after'. Got {type(run_after).__name__}"
+ )
Review Comment:
How about adding a method to handle the common logic between `run_after` and
`logical_date`?
##########
providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py:
##########
@@ -291,16 +316,23 @@ def _trigger_dag_af_3(self, context, run_id,
parsed_logical_date):
deferrable=self.deferrable,
)
- if self.note and "note" in
inspect.signature(DagRunTriggerException.__init__).parameters:
+ parameters =
inspect.signature(DagRunTriggerException.__init__).parameters
+ if self.note and "note" in parameters:
kwargs_accepted["note"] = self.note
+ if parsed_run_after and "run_after" in parameters:
+ kwargs_accepted["run_after"] = parsed_run_after
+
raise DagRunTriggerException(**kwargs_accepted)
def _trigger_dag_af_2(self, context, run_id, parsed_logical_date):
try:
if self.note:
self.log.warning("Parameter 'note' is not supported in Airflow
2.x and will be ignored.")
+ if self.run_after is not NOTSET:
+ self.log.warning("Parameter 'run_after' is not supported in
Airflow 2.x and will be ignored.")
Review Comment:
How about adding attribute like `attrs_not_allowed_in_af2` (perhaps better
naming) for better maintainability as we might add more attributes for
`TriggerDagRunOperator` in the long term?
##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -694,12 +694,17 @@ def trigger(
run_id: str,
conf: dict | None = None,
logical_date: datetime | None = None,
+ run_after: datetime | None = None,
reset_dag_run: bool = False,
Review Comment:
We need to add exection API migration version file like the last adding
`note` one.
##########
providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py:
##########
@@ -215,18 +217,38 @@ def __init__(
f"Expected str, datetime.datetime, or None for parameter
'logical_date'. Got {type(logical_date).__name__}"
)
+ self.run_after = run_after
+ if run_after is NOTSET:
+ self.run_after = NOTSET
+ elif run_after is None or isinstance(run_after, (str,
datetime.datetime)):
+ self.run_after = run_after
+ else:
+ raise TypeError(
+ f"Expected str, datetime.datetime, or None for parameter
'run_after'. Got {type(run_after).__name__}"
+ )
+
if fail_when_dag_is_paused and AIRFLOW_V_3_0_PLUS:
raise NotImplementedError("Setting `fail_when_dag_is_paused` not
yet supported for Airflow 3.x")
def execute(self, context: Context):
if self.logical_date is NOTSET:
- # If no logical_date is provided we will set utcnow()
- parsed_logical_date = timezone.utcnow()
+ if self.run_after is not NOTSET:
+ parsed_logical_date = None
+ else:
+ # If no logical_date is provided we will set utcnow()
+ parsed_logical_date = timezone.utcnow()
elif self.logical_date is None or isinstance(self.logical_date,
datetime.datetime):
parsed_logical_date = self.logical_date # type: ignore
elif isinstance(self.logical_date, str):
parsed_logical_date = timezone.parse(self.logical_date)
+ if self.run_after is NOTSET:
+ parsed_run_after = parsed_logical_date
+ elif self.run_after is None or isinstance(self.run_after,
datetime.datetime):
+ parsed_run_after = self.run_after # type: ignore
+ elif isinstance(self.run_after, str):
+ parsed_run_after = timezone.parse(self.run_after)
Review Comment:
It seems also worthwhile to have a common utils to handle the type across
`None | datetime | str` for both `run_after` and `logical_date`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]