uranusjr commented on code in PR #32000:
URL: https://github.com/apache/airflow/pull/32000#discussion_r1282727578
##########
airflow/models/dag.py:
##########
@@ -2738,22 +2738,48 @@ def add_logger_if_needed(ti: TaskInstance):
)
tasks = self.task_dict
+
+ task_try_numbers: dict[tuple[str, int], int] =
collections.defaultdict(int)
+
self.log.debug("starting dagrun")
# Instead of starting a scheduler, we run the minimal loop possible to
check
# for task readiness and dependency management. This is notably faster
# than creating a BackfillJob and allows us to surface logs to the user
while dr.state == DagRunState.RUNNING:
schedulable_tis, _ = dr.update_state(session=session)
- try:
- for ti in schedulable_tis:
+
+ for ti in schedulable_tis:
+ try:
add_logger_if_needed(ti)
ti.task = tasks[ti.task_id]
_run_task(ti, session=session)
- except Exception:
- self.log.info(
- "Task failed. DAG will continue to run until finished and
be marked as failed.",
- exc_info=True,
- )
+ except Exception:
+ if ti.state == TaskInstanceState.UP_FOR_RETRY:
+ try_number = task_try_numbers[ti.task_id, ti.map_index]
+ if try_number > ti.max_tries:
+ ti.set_state(TaskInstanceState.FAILED)
+ else:
+ task_try_numbers[ti.task_id, ti.map_index] =
try_number + 1
+ self.log.info(
+ "Task failed. DAG will continue to run until finished
and be marked as failed.",
+ exc_info=True,
+ )
+ for ti in dr.get_task_instances(session=session,
state=TaskInstanceState.SCHEDULED):
Review Comment:
This additiona loop allows `test()` to wait for tasks in the deferred state.
When they are resumed (by the triggerer), they will be picked up by this loop
and executed. The function will appear hanging while the tasks are still in the
deferred state because it’s waiting for them to end.
##########
airflow/models/dag.py:
##########
@@ -2738,22 +2738,48 @@ def add_logger_if_needed(ti: TaskInstance):
)
tasks = self.task_dict
+
+ task_try_numbers: dict[tuple[str, int], int] =
collections.defaultdict(int)
+
self.log.debug("starting dagrun")
# Instead of starting a scheduler, we run the minimal loop possible to
check
# for task readiness and dependency management. This is notably faster
# than creating a BackfillJob and allows us to surface logs to the user
while dr.state == DagRunState.RUNNING:
schedulable_tis, _ = dr.update_state(session=session)
- try:
- for ti in schedulable_tis:
+
+ for ti in schedulable_tis:
+ try:
add_logger_if_needed(ti)
ti.task = tasks[ti.task_id]
_run_task(ti, session=session)
- except Exception:
- self.log.info(
- "Task failed. DAG will continue to run until finished and
be marked as failed.",
- exc_info=True,
- )
+ except Exception:
+ if ti.state == TaskInstanceState.UP_FOR_RETRY:
+ try_number = task_try_numbers[ti.task_id, ti.map_index]
+ if try_number > ti.max_tries:
+ ti.set_state(TaskInstanceState.FAILED)
+ else:
+ task_try_numbers[ti.task_id, ti.map_index] =
try_number + 1
+ self.log.info(
+ "Task failed. DAG will continue to run until finished
and be marked as failed.",
+ exc_info=True,
+ )
+ for ti in dr.get_task_instances(session=session,
state=TaskInstanceState.SCHEDULED):
Review Comment:
This additiona loop allows `test()` to wait for tasks in the deferred state.
When they are resumed (by the triggerer), they will be picked up by this loop
and executed. The function will appear hanging while the tasks are still in the
deferred state because it’s waiting for them to end.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]