Lee-W commented on code in PR #39912:
URL: https://github.com/apache/airflow/pull/39912#discussion_r1684620020
##########
airflow/models/dagrun.py:
##########
@@ -1543,11 +1543,26 @@ def schedule_tis(
and not ti.task.outlets
):
dummy_ti_ids.append((ti.task_id, ti.map_index))
- elif ti.task.start_from_trigger is True and
ti.task.start_trigger_args is not None:
- ti.start_date = timezone.utcnow()
- if ti.state != TaskInstanceState.UP_FOR_RESCHEDULE:
- ti.try_number += 1
- ti.defer_task(exception=None, session=session)
+ # check "start_trigger_args" to see whether the operator supports
start execution from triggerer
+ # if so, we'll then check "start_from_trigger" to see whether this
feature is turned on and defer
+ # this task.
+ # if not, we'll add this "ti" into "schedulable_ti_ids" and later
execute it to run in the worker
+ elif ti.task.start_trigger_args is not None:
+ from airflow.models.mappedoperator import MappedOperator
+
+ if isinstance(ti.task, MappedOperator):
+ context = ti.get_template_context()
+ start_from_trigger =
ti.task._expand_start_from_trigger(context=context, session=session)
+ else:
+ start_from_trigger = ti.task.start_from_trigger
Review Comment:
Sounds great! I just refactored it in this way and pushed it.
##########
airflow/models/taskinstance.py:
##########
@@ -1595,13 +1595,20 @@ def _defer_task(
next_kwargs = exception.kwargs
timeout = exception.timeout
elif ti.task is not None and ti.task.start_trigger_args is not None:
+ if isinstance(ti.task, MappedOperator):
+ context = ti.get_template_context()
+ start_trigger_args =
ti.task._expand_start_trigger_args(context=context, session=session)
+ else:
+ start_trigger_args = ti.task.start_trigger_args
Review Comment:
Sounds great! I just refactored it in this way and pushed it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]