mshober commented on issue #35490:
URL: https://github.com/apache/airflow/issues/35490#issuecomment-1806100741
Thanks for starting this @o-nikolas!
It is important to note that you can use `propagateTags=TASK_DEFINITION` as
a way of setting default tags to all the tasks that Airflow launches. I haven't
implemented tagging for my own environment yet, but that would be my approach
to global tagging. That would render settings tags via `run_task_kwargs`
obsolete and then tags set with `executor_config` could always be the complete
set of tags that are passed to the RunTask API.
I'm also personally not a huge fan of the `RUN_TASK_KWARGS` config option.
We already have explicit config options for several attributes (cluster,
networkConfiguration, taskDefinition, platformVersion, launchType). I'd rather
have more of the same. If we dictate what attributes can be set on a config
level then it makes handling the `executor_config` much simpler. For example, I
don't see the need of having `overrides` supported as a global config option
when all of those properties can be set in the task definition.
> As well as from @mshober who has an implementation of this, so I'd be
interested to hear what approach they took and how it's working out for them:
My implementation is very specific and abstracted to my use case:
```python
def _run_task_kwargs(self, task_id: TaskInstanceKeyType, cmd: CommandType,
queue: str, exec_config: ExecutorConfigType) -> dict:
capacity_provider = exec_config.get("capacity_provider")
memory_reservation_mib: int | None =
exec_config.get("memory_reservation_mib")
memory_limit_mib: int | None = exec_config.get("memory_limit_mib")
cpu_reservation: int | None = exec_config.get("cpu_reservation")
run_task_api = deepcopy(self.run_task_kwargs)
container_override = self.get_container(run_task_api["overrides"])
container_override["command"] = cmd
if cpu_reservation is not None:
container_override["cpu"] = int(cpu_reservation)
if memory_reservation_mib is not None:
container_override["memoryReservation"] = int(memory_reservation_mib)
if memory_limit_mib is not None:
container_override["memory"] = int(memory_limit_mib)
if capacity_provider:
run_task_api["capacityProviderStrategy"] = [{"capacityProvider":
capacity_provider}]
return run_task_api
```
so my code is likely not too helpful. I'll spend some time thinking about
how it can be improved to fit all use cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]