stevenschaerer commented on code in PR #38689:
URL: https://github.com/apache/airflow/pull/38689#discussion_r1552241684
##########
airflow/triggers/external_task.py:
##########
@@ -98,44 +99,43 @@ async def run(self) -> typing.AsyncIterator[TriggerEvent]:
"""Check periodically tasks, task group or dag status."""
while True:
if self.failed_states:
- failed_count = _get_count(
- self.execution_dates,
- self.external_task_ids,
- self.external_task_group_id,
- self.external_dag_id,
- self.failed_states,
- )
+ failed_count = await self._get_count(self.failed_states)
if failed_count > 0:
yield TriggerEvent({"status": "failed"})
return
else:
yield TriggerEvent({"status": "success"})
return
if self.skipped_states:
- skipped_count = _get_count(
- self.execution_dates,
- self.external_task_ids,
- self.external_task_group_id,
- self.external_dag_id,
- self.skipped_states,
- )
+ skipped_count = await self._get_count(self.skipped_states)
if skipped_count > 0:
yield TriggerEvent({"status": "skipped"})
return
- allowed_count = _get_count(
- self.execution_dates,
- self.external_task_ids,
- self.external_task_group_id,
- self.external_dag_id,
- self.allowed_states,
- )
+ allowed_count = await self._get_count(self.allowed_states)
if allowed_count == len(self.execution_dates):
yield TriggerEvent({"status": "success"})
return
self.log.info("Sleeping for %s seconds", self.poke_interval)
await asyncio.sleep(self.poke_interval)
+ @sync_to_async
+ def _get_count(self, states: typing.Iterable[str] | None) -> int:
+ """
+ Get the count of records against dttm filter and states. Async wrapper
for _get_count.
+
+ :param states: task or dag states
+ :return The count of records.
+ """
+ return _get_count(
+ dttm_filter=self.execution_dates,
+ external_task_ids=self.external_task_ids,
+ external_task_group_id=self.external_task_group_id,
+ external_dag_id=self.external_dag_id,
+ states=states,
+ )
+
+@deprecated(reason=("TaskStateTrigger has been deprecated and will be removed
in future."))
Review Comment:
Thanks! This is done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]