kaxil commented on code in PR #29260:
URL: https://github.com/apache/airflow/pull/29260#discussion_r1099137356
##########
airflow/example_dags/example_external_task_marker_dag.py:
##########
@@ -1,4 +1,3 @@
-#
Review Comment:
unrelated change? I wonder why static-checks didn't fail
##########
airflow/triggers/external_task.py:
##########
@@ -69,19 +79,50 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
"states": self.states,
"execution_dates": self.execution_dates,
"poll_interval": self.poll_interval,
+ "trigger_start_time": self.trigger_start_time,
},
)
async def run(self) -> typing.AsyncIterator["TriggerEvent"]:
"""
- Checks periodically in the database to see if the task exists, and has
- hit one of the states yet, or not.
+ Checks periodically in the database to see if the dag exists and is in
the running state. If found,
+ wait until the task specified will reach one of the expected states.
If dag with specified name was
+ not in the running state after _timeout_sec seconds after starting
execution process of the trigger,
+ terminate with status 'timeout'.
"""
while True:
- num_tasks = await self.count_tasks()
- if num_tasks == len(self.execution_dates):
- yield TriggerEvent(True)
- await asyncio.sleep(self.poll_interval)
+ try:
+ delta = utcnow() - self.trigger_start_time
+ if delta.total_seconds() < self._timeout_sec:
+ if await self.count_running_dags() == 0:
+ self.log.info("Waiting for DAG to start execution...")
+ await asyncio.sleep(self.poll_interval)
+ else:
+ yield TriggerEvent({"status": "timeout"})
+ return
Review Comment:
Don't think we need return here
##########
airflow/triggers/external_task.py:
##########
@@ -69,19 +79,50 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
"states": self.states,
"execution_dates": self.execution_dates,
"poll_interval": self.poll_interval,
+ "trigger_start_time": self.trigger_start_time,
},
)
async def run(self) -> typing.AsyncIterator["TriggerEvent"]:
"""
- Checks periodically in the database to see if the task exists, and has
- hit one of the states yet, or not.
+ Checks periodically in the database to see if the dag exists and is in
the running state. If found,
+ wait until the task specified will reach one of the expected states.
If dag with specified name was
+ not in the running state after _timeout_sec seconds after starting
execution process of the trigger,
+ terminate with status 'timeout'.
"""
while True:
- num_tasks = await self.count_tasks()
- if num_tasks == len(self.execution_dates):
- yield TriggerEvent(True)
- await asyncio.sleep(self.poll_interval)
+ try:
+ delta = utcnow() - self.trigger_start_time
+ if delta.total_seconds() < self._timeout_sec:
+ if await self.count_running_dags() == 0:
+ self.log.info("Waiting for DAG to start execution...")
+ await asyncio.sleep(self.poll_interval)
+ else:
+ yield TriggerEvent({"status": "timeout"})
+ return
+ if await self.count_tasks() == len(self.execution_dates):
+ yield TriggerEvent({"status": "success"})
+ return
+ self.log.info("Task is still running, sleeping for %s
seconds...", self.poll_interval)
+ await asyncio.sleep(self.poll_interval)
+ except Exception:
+ yield TriggerEvent({"status": "failed"})
+ return
Review Comment:
same as above
##########
airflow/triggers/external_task.py:
##########
@@ -69,19 +79,50 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
"states": self.states,
"execution_dates": self.execution_dates,
"poll_interval": self.poll_interval,
+ "trigger_start_time": self.trigger_start_time,
},
)
async def run(self) -> typing.AsyncIterator["TriggerEvent"]:
"""
- Checks periodically in the database to see if the task exists, and has
- hit one of the states yet, or not.
+ Checks periodically in the database to see if the dag exists and is in
the running state. If found,
+ wait until the task specified will reach one of the expected states.
If dag with specified name was
+ not in the running state after _timeout_sec seconds after starting
execution process of the trigger,
+ terminate with status 'timeout'.
"""
while True:
- num_tasks = await self.count_tasks()
- if num_tasks == len(self.execution_dates):
- yield TriggerEvent(True)
- await asyncio.sleep(self.poll_interval)
+ try:
+ delta = utcnow() - self.trigger_start_time
+ if delta.total_seconds() < self._timeout_sec:
+ if await self.count_running_dags() == 0:
+ self.log.info("Waiting for DAG to start execution...")
+ await asyncio.sleep(self.poll_interval)
+ else:
+ yield TriggerEvent({"status": "timeout"})
+ return
+ if await self.count_tasks() == len(self.execution_dates):
+ yield TriggerEvent({"status": "success"})
+ return
Review Comment:
same as above
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]