ljades commented on issue #19264:
URL: https://github.com/apache/airflow/issues/19264#issuecomment-954056035


   > Hey @ljades,
   > 
   > I looked into this, but couldn't replicate it on current main branch. Do 
you have example dag(s) code to repro the issue?
   
   Sure! Here's an excerpt that's based on the stuff I have in my own 
application (left out company-specific stuff, and stitched together methods 
from different modules).
   
   ```
   def get_docker_build_task(dag: DAG, image_name: str) -> 
DockerBuildPodOperator:
       """..."""
       git_url = "...".format(...)
       build_args = {...}
   
       return DockerBuildPodOperator(
           task_id="...-docker-build",
           image_name=image_name,
           build_context=git_url,
           dockerfile_path="Dockerfile",
           build_args=build_args,
           dag=dag,
           # Improve completion time of docker build
           resources=get_docker_build_resource_requirements(),
       )
   
   def get_build_short_circuit_skip_task(dag: DAG) -> ShortCircuitOperator:
       """Return a ShortCircuitOperator that sets all following tasks to skip 
if worker build fails"""
       # Check if the docker build worker task failed
       def is_worker_build_succeeded(**kwargs: Any) -> bool:
           return kwargs["dag_run"].get_task_instance("...-docker-build").state 
== State.SUCCESS
   
       return ShortCircuitOperator(
           task_id="short-on-build-fail",
           python_callable=is_worker_build_succeeded,
           trigger_rule="all_done",
           dag=dag,
       )
   
   
   def get_trigger_dag_run_and_wait_task(dag: DAG, dag_id: str) -> 
TriggerDagRunOperator:
       """Return a TriggerDagRunOperator for triggering other dags"""
       # The default trigger rule is "all_success": only run this task if 
previous ones all succeeded
       # We change to "all_done" which means that when the upstream task is 
done, regardless
       # of status, it moves on.
       trigger_rule = "all_done"
   
       dag_config = {}
   
       return TriggerDagRunOperator(
           task_id=f"trigger_and_wait_{dag_id}",
           trigger_dag_id=dag_id,
           wait_for_completion=True,
           conf=dag_config,
           trigger_rule=trigger_rule,
           execution_date="{{execution_date}}",  # Added this param to patch 
the problem, without it, the external link doesn't link properly
           dag=dag,
       )
   
   
   def get_fail_if_task(dag: DAG, fail_condition: str = "all_failed") -> 
BashOperator:
       """Return a BashOperator that only runs trivially when all upstream 
tasks succeed"""
       return get_simple_sleep_task(
           dag=dag,
           task_name=f"fail_for_{fail_condition}",
           trigger_rule="none_failed_or_skipped",
           sleep_seconds="2",
       )
   
   
   def get_run_all_subdag() -> DAG:
       """Return a DAG that sequentially runs all other relevant DAGs"""
       dag = DAG(
           dag_id="trigger_OTHERS",
           default_args=...,
           schedule_interval=None,
           tags=[...],
       )
   
       image_name = "..."
       # Always add the build task for a full run
       build = get_docker_build_task(dag, image_name)
       short_circuit = get_build_short_circuit_skip_task(dag)
       # The short circuit skip will skip the fail status for all downstream
       # tasks, so we need extra status checkers to make sure that doesn't 
happen.
       # Two potential fail points: if the docker build fails, and if any dag
       # fails. We need two separate ones because if the build failure status 
checker
       # is downstream from the short circuit, it will short the status checker 
too.
       fail_if_build_failed = get_fail_if_task(dag, fail_condition="build")
       fail_for_any_subdag = get_fail_if_task(dag, fail_condition="any_subdag")
   
       build >> short_circuit
       build >> fail_if_build_failed
   
       # Track a segment containing the dag trigger for each document source
       dag_trigger_tasks = []
       for dag_id in ["my", "list", "of", "dag", "ids", "..."]:
   
           subdag_trigger_task = get_trigger_dag_run_and_wait_task(
               dag=dag, dag_id=dag_id,
           )
   
           if len(dag_trigger_tasks):
               subdag_trigger_task.set_upstream(dag_trigger_tasks[-1])
           else:
               subdag_trigger_task.set_upstream(short_circuit)
   
           subdag_trigger_task.set_downstream(fail_for_any_subdag)
           dag_trigger_tasks.append(subdag_trigger_task)
   
       return dag
   
   
   run_all_subdag = get_run_all_subdag()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to