uranusjr commented on code in PR #57369:
URL: https://github.com/apache/airflow/pull/57369#discussion_r2467829016
##########
providers/standard/src/airflow/providers/standard/triggers/external_task.py:
##########
@@ -129,39 +129,37 @@ async def run(self) -> typing.AsyncIterator[TriggerEvent]:
self.log.info("Sleeping for %s seconds", self.poke_interval)
await asyncio.sleep(self.poke_interval)
- async def _get_count_af_3(self, states):
+ async def _get_count_af_3(self, states: typing.Iterable[str] | None) ->
int:
from airflow.providers.standard.utils.sensor_helper import
_get_count_by_matched_states
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
- params = {
- "dag_id": self.external_dag_id,
- "logical_dates": self.logical_dates,
- "run_ids": self.run_ids,
- }
if self.external_task_ids:
count = await sync_to_async(RuntimeTaskInstance.get_ti_count)(
- task_ids=self.external_task_ids,
- states=states,
- **params,
+ dag_id=self.external_dag_id,
+ task_ids=list(self.external_task_ids),
+ logical_dates=self.logical_dates,
+ run_ids=self.run_ids,
+ states=list(states) if states else None,
)
- elif self.external_task_group_id:
+ return int(count / len(self.external_task_ids))
+ if self.external_task_group_id:
run_id_task_state_map = await
sync_to_async(RuntimeTaskInstance.get_task_states)(
+ dag_id=self.external_dag_id,
task_group_id=self.external_task_group_id,
- **params,
+ logical_dates=self.logical_dates,
+ run_ids=self.run_ids,
)
count = await sync_to_async(_get_count_by_matched_states)(
run_id_task_state_map=run_id_task_state_map,
- states=states,
+ states=list(states) if states else [],
)
Review Comment:
Instead of doing this, `_get_count_by_matched_states` should just be changed
to take a Container[str] instead since it doesn’t really need a list.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]