This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 48bfb1a970 Merge all ECS executor configs following recursive python
dict update (#37137)
48bfb1a970 is described below
commit 48bfb1a970f5b47ba1b385ad809b8324923ddf3e
Author: Niko Oliveira <[email protected]>
AuthorDate: Fri Feb 9 08:43:32 2024 -0800
Merge all ECS executor configs following recursive python dict update
(#37137)
Also document the behaviour and interaction between exec_config and
run_task_kwargs config
---
.../amazon/aws/executors/ecs/ecs_executor.py | 22 ++-
.../executors/ecs-executor.rst | 8 +-
.../amazon/aws/executors/ecs/test_ecs_executor.py | 218 +++++++++++++++++++++
3 files changed, 239 insertions(+), 9 deletions(-)
diff --git a/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
b/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
index 2f0564ed9a..e6594e270f 100644
--- a/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
+++ b/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
@@ -48,6 +48,7 @@ from
airflow.providers.amazon.aws.executors.utils.exponential_backoff_retry impo
)
from airflow.providers.amazon.aws.hooks.ecs import EcsHook
from airflow.utils import timezone
+from airflow.utils.helpers import merge_dicts
from airflow.utils.state import State
if TYPE_CHECKING:
@@ -408,8 +409,8 @@ class AwsEcsExecutor(BaseExecutor):
The command and executor config will be placed in the
container-override
section of the JSON request before calling Boto3's "run_task" function.
"""
- run_task_api = self._run_task_kwargs(task_id, cmd, queue, exec_config)
- boto_run_task = self.ecs.run_task(**run_task_api)
+ run_task_kwargs = self._run_task_kwargs(task_id, cmd, queue,
exec_config)
+ boto_run_task = self.ecs.run_task(**run_task_kwargs)
run_task_response = BotoRunTaskSchema().load(boto_run_task)
return run_task_response
@@ -421,17 +422,17 @@ class AwsEcsExecutor(BaseExecutor):
One last chance to modify Boto3's "run_task" kwarg params before it
gets passed into the Boto3 client.
"""
- run_task_api = deepcopy(self.run_task_kwargs)
- container_override =
self.get_container(run_task_api["overrides"]["containerOverrides"])
+ run_task_kwargs = deepcopy(self.run_task_kwargs)
+ run_task_kwargs = merge_dicts(run_task_kwargs, exec_config)
+ container_override =
self.get_container(run_task_kwargs["overrides"]["containerOverrides"])
container_override["command"] = cmd
- container_override.update(exec_config)
# Inject the env variable to configure logging for containerized
execution environment
if "environment" not in container_override:
container_override["environment"] = []
container_override["environment"].append({"name":
"AIRFLOW_IS_EXECUTOR_CONTAINER", "value": "true"})
- return run_task_api
+ return run_task_kwargs
def execute_async(self, key: TaskInstanceKey, command: CommandType,
queue=None, executor_config=None):
"""Save the task to be executed in the next sync by inserting the
commands into a queue."""
@@ -484,6 +485,11 @@ class AwsEcsExecutor(BaseExecutor):
def get_container(self, container_list):
"""Searches task list for core Airflow container."""
for container in container_list:
- if container["name"] == self.container_name:
- return container
+ try:
+ if container["name"] == self.container_name:
+ return container
+ except KeyError:
+ raise EcsExecutorException(
+ 'container "name" must be provided in "containerOverrides"
configuration'
+ )
raise KeyError(f"No such container found by container name:
{self.container_name}")
diff --git a/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst
b/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst
index a6062a6304..d8d3764f5e 100644
--- a/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst
+++ b/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst
@@ -73,6 +73,9 @@ In the case of conflicts, the order of precedence from lowest
to highest is:
3. Load any values provided in the RUN_TASK_KWARGS option if one is
provided.
+.. note::
+ ``exec_config`` is an optional parameter that can be provided to operators.
It is a dictionary type and in the context of the ECS Executor it represents a
``run_task_kwargs`` configuration which is then updated over-top of the
``run_task_kwargs`` specified in Airflow config above (if present). It is a
recursive update which essentially applies Python update to each nested
dictionary in the configuration. Loosely approximated as:
``run_task_kwargs.update(exec_config)``
+
Required config options:
~~~~~~~~~~~~~~~~~~~~~~~~
@@ -88,7 +91,7 @@ Optional config options:
- ASSIGN_PUBLIC_IP - Whether to assign a public IP address to the
containers launched by the ECS executor. Defaults to "False".
-- CONN_ID - The Airflow connection (i.e. credentials) used by the ECS
+- AWS_CONN_ID - The Airflow connection (i.e. credentials) used by the ECS
executor to make API calls to AWS ECS. Defaults to "aws_default".
- LAUNCH_TYPE - Launch type can either be 'FARGATE' OR 'EC2'. Defaults
to "FARGATE".
@@ -113,6 +116,9 @@ For a more detailed description of available options,
including type
hints and examples, see the ``config_templates`` folder in the Amazon
provider package.
+.. note::
+ ``exec_config`` is an optional parameter that can be provided to operators.
It is a dictionary type and in the context of the ECS Executor it represents a
``run_task_kwargs`` configuration which is then updated over-top of the
``run_task_kwargs`` specified in Airflow config above (if present). It is a
recursive update which essentially applies Python update to each nested
dictionary in the configuration. Loosely approximated as:
``run_task_kwargs.update(exec_config)``
+
.. _dockerfile_for_ecs_executor:
Dockerfile for ECS Executor
diff --git a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
index 04e7774555..8766659c05 100644
--- a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
+++ b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
@@ -34,6 +34,7 @@ from inflection import camelize
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor
+from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.amazon.aws.executors.ecs import ecs_executor,
ecs_executor_config
from airflow.providers.amazon.aws.executors.ecs.boto_schema import
BotoTaskSchema
from airflow.providers.amazon.aws.executors.ecs.ecs_executor import (
@@ -1156,3 +1157,220 @@ class TestEcsExecutorConfig:
task_kwargs = ecs_executor_config.build_task_kwargs()
assert task_kwargs["launchType"] == "FARGATE"
+
+ @pytest.mark.parametrize(
+ "run_task_kwargs, exec_config, expected_result",
+ [
+ # No input run_task_kwargs or executor overrides
+ (
+ {},
+ {},
+ {
+ "taskDefinition": "some-task-def",
+ "launchType": "FARGATE",
+ "cluster": "some-cluster",
+ "platformVersion": "LATEST",
+ "count": 1,
+ "overrides": {
+ "containerOverrides": [
+ {
+ "command": ["command"],
+ "name": "container-name",
+ "environment": [{"name":
"AIRFLOW_IS_EXECUTOR_CONTAINER", "value": "true"}],
+ }
+ ]
+ },
+ "networkConfiguration": {
+ "awsvpcConfiguration": {
+ "subnets": ["sub1", "sub2"],
+ "securityGroups": ["sg1", "sg2"],
+ "assignPublicIp": "DISABLED",
+ }
+ },
+ },
+ ),
+ # run_task_kwargs provided, not exec_config
+ (
+ {
+ "startedBy": "Banana",
+ "tags": [{"key": "FOO", "value": "BAR"}],
+ "overrides": {
+ "containerOverrides": [
+ {
+ "name": "container-name",
+ "memory": 500,
+ "cpu": 10,
+ "environment": [{"name": "X", "value": "Y"}],
+ }
+ ]
+ },
+ },
+ {},
+ {
+ "startedBy": "Banana",
+ "tags": [{"key": "FOO", "value": "BAR"}],
+ "taskDefinition": "some-task-def",
+ "launchType": "FARGATE",
+ "cluster": "some-cluster",
+ "platformVersion": "LATEST",
+ "count": 1,
+ "overrides": {
+ "containerOverrides": [
+ {
+ "memory": 500,
+ "cpu": 10,
+ "command": ["command"],
+ "name": "container-name",
+ "environment": [
+ {"name": "X", "value": "Y"},
+ # Added by the ecs executor
+ {"name": "AIRFLOW_IS_EXECUTOR_CONTAINER",
"value": "true"},
+ ],
+ }
+ ]
+ },
+ # Added by the ecs executor
+ "networkConfiguration": {
+ "awsvpcConfiguration": {
+ "subnets": ["sub1", "sub2"],
+ "securityGroups": ["sg1", "sg2"],
+ "assignPublicIp": "DISABLED",
+ }
+ },
+ },
+ ),
+ # exec_config provided, no run_task_kwargs
+ (
+ {},
+ {
+ "startedBy": "Banana",
+ "tags": [{"key": "FOO", "value": "BAR"}],
+ "overrides": {
+ "containerOverrides": [
+ {
+ "name": "container-name",
+ "memory": 500,
+ "cpu": 10,
+ "environment": [{"name": "X", "value": "Y"}],
+ }
+ ]
+ },
+ },
+ {
+ "startedBy": "Banana",
+ "tags": [{"key": "FOO", "value": "BAR"}],
+ "taskDefinition": "some-task-def",
+ "launchType": "FARGATE",
+ "cluster": "some-cluster",
+ "platformVersion": "LATEST",
+ "count": 1,
+ "overrides": {
+ "containerOverrides": [
+ {
+ "memory": 500,
+ "cpu": 10,
+ "command": ["command"],
+ "name": "container-name",
+ "environment": [
+ {"name": "X", "value": "Y"},
+ # Added by the ecs executor
+ {"name": "AIRFLOW_IS_EXECUTOR_CONTAINER",
"value": "true"},
+ ],
+ }
+ ]
+ },
+ # Added by the ecs executor
+ "networkConfiguration": {
+ "awsvpcConfiguration": {
+ "subnets": ["sub1", "sub2"],
+ "securityGroups": ["sg1", "sg2"],
+ "assignPublicIp": "DISABLED",
+ }
+ },
+ },
+ ),
+ # Both run_task_kwargs and executor_config provided. The latter
should override the former,
+ # following a recursive python dict update strategy
+ (
+ {
+ "startedBy": "Banana",
+ "tags": [{"key": "FOO", "value": "BAR"}],
+ "taskDefinition": "foobar",
+ "overrides": {
+ "containerOverrides": [
+ {
+ "name": "container-name",
+ "memory": 500,
+ "cpu": 10,
+ "environment": [{"name": "X", "value": "Y"}],
+ }
+ ]
+ },
+ },
+ {
+ "startedBy": "Fish",
+ "tags": [{"key": "X", "value": "Y"}, {"key": "W", "value":
"Z"}],
+ "overrides": {
+ "containerOverrides": [
+ {
+ "name": "container-name",
+ "memory": 300,
+ "environment": [{"name": "W", "value": "Z"}],
+ }
+ ]
+ },
+ },
+ {
+ # tags and startedBy are overridden by exec_config
+ "startedBy": "Fish",
+ # List types overwrite entirely, as python dict update
would do
+ "tags": [{"key": "X", "value": "Y"}, {"key": "W", "value":
"Z"}],
+ # taskDefinition remains since it is not a list type and
not overridden by exec config
+ "taskDefinition": "foobar",
+ "launchType": "FARGATE",
+ "cluster": "some-cluster",
+ "platformVersion": "LATEST",
+ "count": 1,
+ "overrides": {
+ "containerOverrides": [
+ {
+ "memory": 300,
+ # cpu is not present because it was missing
from the container overrides in
+ # the exec_config
+ "command": ["command"],
+ "name": "container-name",
+ "environment": [
+ # Overridden list type
+ {"name": "W", "value": "Z"}, # Only new
env vars present, overwritten
+ # Added by the ecs executor
+ {"name": "AIRFLOW_IS_EXECUTOR_CONTAINER",
"value": "true"},
+ ],
+ }
+ ]
+ },
+ # Added by the ecs executor
+ "networkConfiguration": {
+ "awsvpcConfiguration": {
+ "subnets": ["sub1", "sub2"],
+ "securityGroups": ["sg1", "sg2"],
+ "assignPublicIp": "DISABLED",
+ }
+ },
+ },
+ ),
+ ],
+ )
+ def test_run_task_kwargs_exec_config_overrides(
+ self, set_env_vars, run_task_kwargs, exec_config, expected_result
+ ):
+ run_task_kwargs_env_key =
f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.RUN_TASK_KWARGS}".upper()
+ os.environ[run_task_kwargs_env_key] = json.dumps(run_task_kwargs)
+
+ mock_ti_key = mock.Mock(spec=TaskInstanceKey)
+ command = ["command"]
+
+ executor = AwsEcsExecutor()
+
+ final_run_task_kwargs = executor._run_task_kwargs(mock_ti_key,
command, "queue", exec_config)
+
+ assert final_run_task_kwargs == expected_result