ashb commented on a change in pull request #16233:
URL: https://github.com/apache/airflow/pull/16233#discussion_r649131608
##########
File path: airflow/www/views.py
##########
@@ -1810,47 +1810,86 @@ def _mark_task_instance_state( # pylint:
disable=too-many-arguments
from airflow.api.common.experimental.mark_tasks import set_state
- if confirmed:
- with create_session() as session:
- altered = set_state(
- tasks=[task],
- execution_date=execution_date,
- upstream=upstream,
- downstream=downstream,
- future=future,
- past=past,
- state=state,
- commit=True,
- session=session,
- )
+ with create_session() as session:
+ altered = set_state(
+ tasks=[task],
+ execution_date=execution_date,
+ upstream=upstream,
+ downstream=downstream,
+ future=future,
+ past=past,
+ state=state,
+ commit=True,
+ session=session,
+ )
- # Clear downstream tasks that are in failed/upstream_failed
state to resume them.
- # Flush the session so that the tasks marked success are
reflected in the db.
- session.flush()
- subdag = dag.partial_subset(
- task_ids_or_regex={task_id},
- include_downstream=True,
- include_upstream=False,
- )
+ # Clear downstream tasks that are in failed/upstream_failed state
to resume them.
+ # Flush the session so that the tasks marked success are reflected
in the db.
+ session.flush()
+ subdag = dag.partial_subset(
+ task_ids_or_regex={task_id},
+ include_downstream=True,
+ include_upstream=False,
+ )
- end_date = execution_date if not future else None
- start_date = execution_date if not past else None
-
- subdag.clear(
- start_date=start_date,
- end_date=end_date,
- include_subdags=True,
- include_parentdag=True,
- only_failed=True,
- session=session,
- # Exclude the task itself from being cleared
- exclude_task_ids={task_id},
- )
+ end_date = execution_date if not future else None
+ start_date = execution_date if not past else None
- session.commit()
+ subdag.clear(
+ start_date=start_date,
+ end_date=end_date,
+ include_subdags=True,
+ include_parentdag=True,
+ only_failed=True,
+ session=session,
+ # Exclude the task itself from being cleared
+ exclude_task_ids={task_id},
+ )
- flash(f"Marked {state} on {len(altered)} task instances")
- return redirect(origin)
+ session.commit()
+
+ flash(f"Marked {state} on {len(altered)} task instances")
+ return redirect(origin)
+
+ @expose('/confirm', methods=['GET'])
+ @auth.has_access(
+ [
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+ ]
+ )
+ @action_logging
+ def confirm(self):
+ """Show confirmation page for marking tasks as success or failed."""
+ dag_id = request.args.get('dag_id')
+ task_id = request.args.get('task_id')
+ execution_date = request.args.get('execution_date')
+ state = request.args.get('state')
+
+ upstream = request.args.get('failed_upstream') == "true"
+ downstream = request.args.get('failed_downstream') == "true"
+ future = request.args.get('failed_future') == "true"
+ past = request.args.get('failed_past') == "true"
Review comment:
Let's use `airflow.utils.strings.to_boolean()` function here.
##########
File path: airflow/www/views.py
##########
@@ -1810,47 +1810,86 @@ def _mark_task_instance_state( # pylint:
disable=too-many-arguments
from airflow.api.common.experimental.mark_tasks import set_state
- if confirmed:
- with create_session() as session:
- altered = set_state(
- tasks=[task],
- execution_date=execution_date,
- upstream=upstream,
- downstream=downstream,
- future=future,
- past=past,
- state=state,
- commit=True,
- session=session,
- )
+ with create_session() as session:
+ altered = set_state(
+ tasks=[task],
+ execution_date=execution_date,
+ upstream=upstream,
+ downstream=downstream,
+ future=future,
+ past=past,
+ state=state,
+ commit=True,
+ session=session,
+ )
- # Clear downstream tasks that are in failed/upstream_failed
state to resume them.
- # Flush the session so that the tasks marked success are
reflected in the db.
- session.flush()
- subdag = dag.partial_subset(
- task_ids_or_regex={task_id},
- include_downstream=True,
- include_upstream=False,
- )
+ # Clear downstream tasks that are in failed/upstream_failed state
to resume them.
+ # Flush the session so that the tasks marked success are reflected
in the db.
+ session.flush()
+ subdag = dag.partial_subset(
+ task_ids_or_regex={task_id},
+ include_downstream=True,
+ include_upstream=False,
+ )
- end_date = execution_date if not future else None
- start_date = execution_date if not past else None
-
- subdag.clear(
- start_date=start_date,
- end_date=end_date,
- include_subdags=True,
- include_parentdag=True,
- only_failed=True,
- session=session,
- # Exclude the task itself from being cleared
- exclude_task_ids={task_id},
- )
+ end_date = execution_date if not future else None
+ start_date = execution_date if not past else None
- session.commit()
+ subdag.clear(
+ start_date=start_date,
+ end_date=end_date,
+ include_subdags=True,
+ include_parentdag=True,
+ only_failed=True,
+ session=session,
+ # Exclude the task itself from being cleared
+ exclude_task_ids={task_id},
+ )
- flash(f"Marked {state} on {len(altered)} task instances")
- return redirect(origin)
+ session.commit()
+
+ flash(f"Marked {state} on {len(altered)} task instances")
+ return redirect(origin)
+
+ @expose('/confirm', methods=['GET'])
+ @auth.has_access(
+ [
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+ ]
+ )
+ @action_logging
+ def confirm(self):
+ """Show confirmation page for marking tasks as success or failed."""
+ dag_id = request.args.get('dag_id')
+ task_id = request.args.get('task_id')
+ execution_date = request.args.get('execution_date')
+ state = request.args.get('state')
+
+ upstream = request.args.get('failed_upstream') == "true"
+ downstream = request.args.get('failed_downstream') == "true"
+ future = request.args.get('failed_future') == "true"
+ past = request.args.get('failed_past') == "true"
+
+ dag = current_app.dag_bag.get_dag(dag_id)
+ task = dag.get_task(task_id)
+ task.dag = dag
+
+ if state not in (
+ 'success',
+ 'failed',
+ ):
+ flash(f"Invalid state {state}, must be either 'success' or
'failed'", "error")
+ return redirect(request.referrer or url_for('Airflow.index'))
+
+ latest_execution_date = dag.get_latest_execution_date()
+ if not latest_execution_date:
+ flash(f"Cannot make {state}, seem that dag {dag_id} has never
run", "error")
Review comment:
```suggestion
flash(f"Cannot mark tasks as {state}, seem that dag {dag_id} has
never run", "error")
```
##########
File path: airflow/www/views.py
##########
@@ -1810,47 +1810,86 @@ def _mark_task_instance_state( # pylint:
disable=too-many-arguments
from airflow.api.common.experimental.mark_tasks import set_state
- if confirmed:
- with create_session() as session:
- altered = set_state(
- tasks=[task],
- execution_date=execution_date,
- upstream=upstream,
- downstream=downstream,
- future=future,
- past=past,
- state=state,
- commit=True,
- session=session,
- )
+ with create_session() as session:
+ altered = set_state(
+ tasks=[task],
+ execution_date=execution_date,
+ upstream=upstream,
+ downstream=downstream,
+ future=future,
+ past=past,
+ state=state,
+ commit=True,
+ session=session,
+ )
- # Clear downstream tasks that are in failed/upstream_failed
state to resume them.
- # Flush the session so that the tasks marked success are
reflected in the db.
- session.flush()
- subdag = dag.partial_subset(
- task_ids_or_regex={task_id},
- include_downstream=True,
- include_upstream=False,
- )
+ # Clear downstream tasks that are in failed/upstream_failed state
to resume them.
+ # Flush the session so that the tasks marked success are reflected
in the db.
+ session.flush()
+ subdag = dag.partial_subset(
+ task_ids_or_regex={task_id},
+ include_downstream=True,
+ include_upstream=False,
+ )
- end_date = execution_date if not future else None
- start_date = execution_date if not past else None
-
- subdag.clear(
- start_date=start_date,
- end_date=end_date,
- include_subdags=True,
- include_parentdag=True,
- only_failed=True,
- session=session,
- # Exclude the task itself from being cleared
- exclude_task_ids={task_id},
- )
+ end_date = execution_date if not future else None
+ start_date = execution_date if not past else None
- session.commit()
+ subdag.clear(
+ start_date=start_date,
+ end_date=end_date,
+ include_subdags=True,
+ include_parentdag=True,
+ only_failed=True,
+ session=session,
+ # Exclude the task itself from being cleared
+ exclude_task_ids={task_id},
+ )
- flash(f"Marked {state} on {len(altered)} task instances")
- return redirect(origin)
+ session.commit()
+
+ flash(f"Marked {state} on {len(altered)} task instances")
+ return redirect(origin)
+
+ @expose('/confirm', methods=['GET'])
+ @auth.has_access(
+ [
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+ ]
+ )
+ @action_logging
+ def confirm(self):
+ """Show confirmation page for marking tasks as success or failed."""
+ dag_id = request.args.get('dag_id')
+ task_id = request.args.get('task_id')
+ execution_date = request.args.get('execution_date')
+ state = request.args.get('state')
+
+ upstream = request.args.get('failed_upstream') == "true"
+ downstream = request.args.get('failed_downstream') == "true"
+ future = request.args.get('failed_future') == "true"
+ past = request.args.get('failed_past') == "true"
+
+ dag = current_app.dag_bag.get_dag(dag_id)
Review comment:
Dag could be not found here -- that should return a 400/404 if so.
Ditto for task not found in dag.
##########
File path: airflow/www/views.py
##########
@@ -1867,6 +1906,7 @@ def _mark_task_instance_state( # pylint:
disable=too-many-arguments
response = self.render_template(
"airflow/confirm.html",
+ endpoint=state,
Review comment:
```suggestion
endpoint=url_for(f'Airflow.{state}'),
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]