pankajastro commented on code in PR #36916:
URL: https://github.com/apache/airflow/pull/36916#discussion_r1465998842


##########
airflow/triggers/external_task.py:
##########
@@ -36,6 +38,103 @@
     from airflow.utils.state import DagRunState
 
 
+class WorkflowTrigger(BaseTrigger):
+    """
+    A trigger to monitor tasks, task group and dag execution in Apache Airflow.
+
+    :param external_dag_id: The ID of the external DAG.
+    :param execution_dates: A list of execution dates for the external DAG.
+    :param external_task_ids: A collection of external task IDs to wait for.
+    :param external_task_group_id: The ID of the external task group to wait 
for.
+    :param failed_states: States considered as failed for external tasks.
+    :param skipped_states: States considered as skipped for external tasks.
+    :param allowed_states: States considered as successful for external tasks.
+    :param poke_interval: The interval (in seconds) for poking the external 
tasks.
+    :param soft_fail: If True, the trigger will not fail the entire DAG on 
external task failure.
+    """
+
+    def __init__(
+        self,
+        external_dag_id: str,
+        execution_dates: list,
+        external_task_ids: typing.Collection[str] | None = None,
+        external_task_group_id: str | None = None,
+        failed_states: typing.Iterable[str] | None = None,
+        skipped_states: typing.Iterable[str] | None = None,
+        allowed_states: typing.Iterable[str] | None = None,
+        poke_interval: float = 2.0,
+        soft_fail: bool = False,
+        **kwargs,
+    ):
+        self.external_dag_id = external_dag_id
+        self.external_task_ids = external_task_ids
+        self.external_task_group_id = external_task_group_id
+        self.failed_states = failed_states
+        self.skipped_states = skipped_states
+        self.allowed_states = allowed_states
+        self.execution_dates = execution_dates
+        self.poke_interval = poke_interval
+        self.soft_fail = soft_fail
+        super().__init__(**kwargs)
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        """Serialize the trigger param and module path."""
+        return (
+            "airflow.triggers.external_task.WorkflowTrigger",
+            {
+                "external_dag_id": self.external_dag_id,
+                "external_task_ids": self.external_task_ids,
+                "external_task_group_id": self.external_task_group_id,
+                "failed_states": self.failed_states,
+                "skipped_states": self.skipped_states,
+                "allowed_states": self.allowed_states,
+                "execution_dates": self.execution_dates,
+                "poke_interval": self.poke_interval,
+                "soft_fail": self.soft_fail,
+            },
+        )
+
+    async def run(self) -> typing.AsyncIterator[TriggerEvent]:
+        """Check periodically tasks, task group or dag status."""
+        while True:
+            if self.failed_states:
+                count_failed = _get_count(
+                    self.execution_dates,
+                    self.external_task_ids,
+                    self.external_task_group_id,
+                    self.external_dag_id,
+                    self.failed_states,
+                )
+                if count_failed > 0:
+                    yield TriggerEvent({"status": "success"})
+                    return
+                else:
+                    yield TriggerEvent({"status": "success"})
+                    return
+            if self.skipped_states:
+                count_skipped = _get_count(
+                    self.execution_dates,
+                    self.external_task_ids,
+                    self.external_task_group_id,
+                    self.external_dag_id,
+                    self.skipped_states,
+                )
+                if count_skipped > 0:
+                    yield TriggerEvent({"status": "success"})

Review Comment:
   yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to