KoviAnusha commented on code in PR #57369:
URL: https://github.com/apache/airflow/pull/57369#discussion_r2467937895
##########
providers/standard/src/airflow/providers/standard/triggers/external_task.py:
##########
@@ -129,39 +129,37 @@ async def run(self) -> typing.AsyncIterator[TriggerEvent]:
self.log.info("Sleeping for %s seconds", self.poke_interval)
await asyncio.sleep(self.poke_interval)
- async def _get_count_af_3(self, states):
+ async def _get_count_af_3(self, states: typing.Iterable[str] | None) ->
int:
from airflow.providers.standard.utils.sensor_helper import
_get_count_by_matched_states
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
- params = {
- "dag_id": self.external_dag_id,
- "logical_dates": self.logical_dates,
- "run_ids": self.run_ids,
- }
if self.external_task_ids:
count = await sync_to_async(RuntimeTaskInstance.get_ti_count)(
- task_ids=self.external_task_ids,
- states=states,
- **params,
+ dag_id=self.external_dag_id,
+ task_ids=list(self.external_task_ids),
+ logical_dates=self.logical_dates,
+ run_ids=self.run_ids,
+ states=list(states) if states else None,
)
- elif self.external_task_group_id:
+ return int(count / len(self.external_task_ids))
+ if self.external_task_group_id:
run_id_task_state_map = await
sync_to_async(RuntimeTaskInstance.get_task_states)(
+ dag_id=self.external_dag_id,
task_group_id=self.external_task_group_id,
- **params,
+ logical_dates=self.logical_dates,
+ run_ids=self.run_ids,
)
count = await sync_to_async(_get_count_by_matched_states)(
run_id_task_state_map=run_id_task_state_map,
- states=states,
+ states=list(states) if states else [],
)
Review Comment:
Thanks for catching that! I initially added the list conversion to handle
non-iterable cases, but switching the function to take a Container[str] sounds
cleaner. I’ll make that change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]