Taragolis commented on code in PR #38689:
URL: https://github.com/apache/airflow/pull/38689#discussion_r1552121352
##########
airflow/triggers/external_task.py:
##########
@@ -98,44 +99,43 @@ async def run(self) -> typing.AsyncIterator[TriggerEvent]:
"""Check periodically tasks, task group or dag status."""
while True:
if self.failed_states:
- failed_count = _get_count(
- self.execution_dates,
- self.external_task_ids,
- self.external_task_group_id,
- self.external_dag_id,
- self.failed_states,
- )
+ failed_count = await self._get_count(self.failed_states)
if failed_count > 0:
yield TriggerEvent({"status": "failed"})
return
else:
yield TriggerEvent({"status": "success"})
return
if self.skipped_states:
- skipped_count = _get_count(
- self.execution_dates,
- self.external_task_ids,
- self.external_task_group_id,
- self.external_dag_id,
- self.skipped_states,
- )
+ skipped_count = await self._get_count(self.skipped_states)
if skipped_count > 0:
yield TriggerEvent({"status": "skipped"})
return
- allowed_count = _get_count(
- self.execution_dates,
- self.external_task_ids,
- self.external_task_group_id,
- self.external_dag_id,
- self.allowed_states,
- )
+ allowed_count = await self._get_count(self.allowed_states)
if allowed_count == len(self.execution_dates):
yield TriggerEvent({"status": "success"})
return
self.log.info("Sleeping for %s seconds", self.poke_interval)
await asyncio.sleep(self.poke_interval)
+ @sync_to_async
+ def _get_count(self, states: typing.Iterable[str] | None) -> int:
+ """
+ Get the count of records against dttm filter and states. Async wrapper
for _get_count.
+
+ :param states: task or dag states
+ :return The count of records.
+ """
+ return _get_count(
+ dttm_filter=self.execution_dates,
+ external_task_ids=self.external_task_ids,
+ external_task_group_id=self.external_task_group_id,
+ external_dag_id=self.external_dag_id,
+ states=states,
+ )
+
+@deprecated(reason=("TaskStateTrigger has been deprecated and will be removed
in future."))
Review Comment:
Just found why tests around this class not failed
```suggestion
@deprecated(
reason="TaskStateTrigger has been deprecated and will be removed in
future.",
category=RemovedInAirflow3Warning,
)
```
And this is required to [deal with
warnings](https://github.com/apache/airflow/blob/main/contributing-docs/testing/unit_tests.rst#handling-warnings)
into the
`tests/triggers/test_external_task.py::test_external_task.py::TestTaskStateTrigger`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]