This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 48bfb1a970 Merge all ECS executor configs following recursive python 
dict update (#37137)
48bfb1a970 is described below

commit 48bfb1a970f5b47ba1b385ad809b8324923ddf3e
Author: Niko Oliveira <[email protected]>
AuthorDate: Fri Feb 9 08:43:32 2024 -0800

    Merge all ECS executor configs following recursive python dict update 
(#37137)
    
    Also document the behaviour and interaction between exec_config and
    run_task_kwargs config
---
 .../amazon/aws/executors/ecs/ecs_executor.py       |  22 ++-
 .../executors/ecs-executor.rst                     |   8 +-
 .../amazon/aws/executors/ecs/test_ecs_executor.py  | 218 +++++++++++++++++++++
 3 files changed, 239 insertions(+), 9 deletions(-)

diff --git a/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py 
b/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
index 2f0564ed9a..e6594e270f 100644
--- a/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
+++ b/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
@@ -48,6 +48,7 @@ from 
airflow.providers.amazon.aws.executors.utils.exponential_backoff_retry impo
 )
 from airflow.providers.amazon.aws.hooks.ecs import EcsHook
 from airflow.utils import timezone
+from airflow.utils.helpers import merge_dicts
 from airflow.utils.state import State
 
 if TYPE_CHECKING:
@@ -408,8 +409,8 @@ class AwsEcsExecutor(BaseExecutor):
         The command and executor config will be placed in the 
container-override
         section of the JSON request before calling Boto3's "run_task" function.
         """
-        run_task_api = self._run_task_kwargs(task_id, cmd, queue, exec_config)
-        boto_run_task = self.ecs.run_task(**run_task_api)
+        run_task_kwargs = self._run_task_kwargs(task_id, cmd, queue, 
exec_config)
+        boto_run_task = self.ecs.run_task(**run_task_kwargs)
         run_task_response = BotoRunTaskSchema().load(boto_run_task)
         return run_task_response
 
@@ -421,17 +422,17 @@ class AwsEcsExecutor(BaseExecutor):
 
         One last chance to modify Boto3's "run_task" kwarg params before it 
gets passed into the Boto3 client.
         """
-        run_task_api = deepcopy(self.run_task_kwargs)
-        container_override = 
self.get_container(run_task_api["overrides"]["containerOverrides"])
+        run_task_kwargs = deepcopy(self.run_task_kwargs)
+        run_task_kwargs = merge_dicts(run_task_kwargs, exec_config)
+        container_override = 
self.get_container(run_task_kwargs["overrides"]["containerOverrides"])
         container_override["command"] = cmd
-        container_override.update(exec_config)
 
         # Inject the env variable to configure logging for containerized 
execution environment
         if "environment" not in container_override:
             container_override["environment"] = []
         container_override["environment"].append({"name": 
"AIRFLOW_IS_EXECUTOR_CONTAINER", "value": "true"})
 
-        return run_task_api
+        return run_task_kwargs
 
     def execute_async(self, key: TaskInstanceKey, command: CommandType, 
queue=None, executor_config=None):
         """Save the task to be executed in the next sync by inserting the 
commands into a queue."""
@@ -484,6 +485,11 @@ class AwsEcsExecutor(BaseExecutor):
     def get_container(self, container_list):
         """Searches task list for core Airflow container."""
         for container in container_list:
-            if container["name"] == self.container_name:
-                return container
+            try:
+                if container["name"] == self.container_name:
+                    return container
+            except KeyError:
+                raise EcsExecutorException(
+                    'container "name" must be provided in "containerOverrides" 
configuration'
+                )
         raise KeyError(f"No such container found by container name: 
{self.container_name}")
diff --git a/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst 
b/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst
index a6062a6304..d8d3764f5e 100644
--- a/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst
+++ b/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst
@@ -73,6 +73,9 @@ In the case of conflicts, the order of precedence from lowest 
to highest is:
 3. Load any values provided in the RUN_TASK_KWARGS option if one is
    provided.
 
+.. note::
+   ``exec_config`` is an optional parameter that can be provided to operators. 
It is a dictionary type and in the context of the ECS Executor it represents a 
``run_task_kwargs`` configuration which is then updated over-top of the 
``run_task_kwargs`` specified in Airflow config above (if present). It is a 
recursive update which essentially applies Python update to each nested 
dictionary in the configuration. Loosely approximated as: 
``run_task_kwargs.update(exec_config)``
+
 Required config options:
 ~~~~~~~~~~~~~~~~~~~~~~~~
 
@@ -88,7 +91,7 @@ Optional config options:
 
 -  ASSIGN_PUBLIC_IP - Whether to assign a public IP address to the
    containers launched by the ECS executor. Defaults to "False".
--  CONN_ID - The Airflow connection (i.e. credentials) used by the ECS
+-  AWS_CONN_ID - The Airflow connection (i.e. credentials) used by the ECS
    executor to make API calls to AWS ECS. Defaults to "aws_default".
 -  LAUNCH_TYPE - Launch type can either be 'FARGATE' OR 'EC2'. Defaults
    to "FARGATE".
@@ -113,6 +116,9 @@ For a more detailed description of available options, 
including type
 hints and examples, see the ``config_templates`` folder in the Amazon
 provider package.
 
+.. note::
+   ``exec_config`` is an optional parameter that can be provided to operators. 
It is a dictionary type and in the context of the ECS Executor it represents a 
``run_task_kwargs`` configuration which is then updated over-top of the 
``run_task_kwargs`` specified in Airflow config above (if present). It is a 
recursive update which essentially applies Python update to each nested 
dictionary in the configuration. Loosely approximated as: 
``run_task_kwargs.update(exec_config)``
+
 .. _dockerfile_for_ecs_executor:
 
 Dockerfile for ECS Executor
diff --git a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py 
b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
index 04e7774555..8766659c05 100644
--- a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
+++ b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
@@ -34,6 +34,7 @@ from inflection import camelize
 
 from airflow.exceptions import AirflowException
 from airflow.executors.base_executor import BaseExecutor
+from airflow.models.taskinstancekey import TaskInstanceKey
 from airflow.providers.amazon.aws.executors.ecs import ecs_executor, 
ecs_executor_config
 from airflow.providers.amazon.aws.executors.ecs.boto_schema import 
BotoTaskSchema
 from airflow.providers.amazon.aws.executors.ecs.ecs_executor import (
@@ -1156,3 +1157,220 @@ class TestEcsExecutorConfig:
 
         task_kwargs = ecs_executor_config.build_task_kwargs()
         assert task_kwargs["launchType"] == "FARGATE"
+
+    @pytest.mark.parametrize(
+        "run_task_kwargs, exec_config, expected_result",
+        [
+            # No input run_task_kwargs or executor overrides
+            (
+                {},
+                {},
+                {
+                    "taskDefinition": "some-task-def",
+                    "launchType": "FARGATE",
+                    "cluster": "some-cluster",
+                    "platformVersion": "LATEST",
+                    "count": 1,
+                    "overrides": {
+                        "containerOverrides": [
+                            {
+                                "command": ["command"],
+                                "name": "container-name",
+                                "environment": [{"name": 
"AIRFLOW_IS_EXECUTOR_CONTAINER", "value": "true"}],
+                            }
+                        ]
+                    },
+                    "networkConfiguration": {
+                        "awsvpcConfiguration": {
+                            "subnets": ["sub1", "sub2"],
+                            "securityGroups": ["sg1", "sg2"],
+                            "assignPublicIp": "DISABLED",
+                        }
+                    },
+                },
+            ),
+            # run_task_kwargs provided, not exec_config
+            (
+                {
+                    "startedBy": "Banana",
+                    "tags": [{"key": "FOO", "value": "BAR"}],
+                    "overrides": {
+                        "containerOverrides": [
+                            {
+                                "name": "container-name",
+                                "memory": 500,
+                                "cpu": 10,
+                                "environment": [{"name": "X", "value": "Y"}],
+                            }
+                        ]
+                    },
+                },
+                {},
+                {
+                    "startedBy": "Banana",
+                    "tags": [{"key": "FOO", "value": "BAR"}],
+                    "taskDefinition": "some-task-def",
+                    "launchType": "FARGATE",
+                    "cluster": "some-cluster",
+                    "platformVersion": "LATEST",
+                    "count": 1,
+                    "overrides": {
+                        "containerOverrides": [
+                            {
+                                "memory": 500,
+                                "cpu": 10,
+                                "command": ["command"],
+                                "name": "container-name",
+                                "environment": [
+                                    {"name": "X", "value": "Y"},
+                                    # Added by the ecs executor
+                                    {"name": "AIRFLOW_IS_EXECUTOR_CONTAINER", 
"value": "true"},
+                                ],
+                            }
+                        ]
+                    },
+                    # Added by the ecs executor
+                    "networkConfiguration": {
+                        "awsvpcConfiguration": {
+                            "subnets": ["sub1", "sub2"],
+                            "securityGroups": ["sg1", "sg2"],
+                            "assignPublicIp": "DISABLED",
+                        }
+                    },
+                },
+            ),
+            # exec_config provided, no run_task_kwargs
+            (
+                {},
+                {
+                    "startedBy": "Banana",
+                    "tags": [{"key": "FOO", "value": "BAR"}],
+                    "overrides": {
+                        "containerOverrides": [
+                            {
+                                "name": "container-name",
+                                "memory": 500,
+                                "cpu": 10,
+                                "environment": [{"name": "X", "value": "Y"}],
+                            }
+                        ]
+                    },
+                },
+                {
+                    "startedBy": "Banana",
+                    "tags": [{"key": "FOO", "value": "BAR"}],
+                    "taskDefinition": "some-task-def",
+                    "launchType": "FARGATE",
+                    "cluster": "some-cluster",
+                    "platformVersion": "LATEST",
+                    "count": 1,
+                    "overrides": {
+                        "containerOverrides": [
+                            {
+                                "memory": 500,
+                                "cpu": 10,
+                                "command": ["command"],
+                                "name": "container-name",
+                                "environment": [
+                                    {"name": "X", "value": "Y"},
+                                    # Added by the ecs executor
+                                    {"name": "AIRFLOW_IS_EXECUTOR_CONTAINER", 
"value": "true"},
+                                ],
+                            }
+                        ]
+                    },
+                    # Added by the ecs executor
+                    "networkConfiguration": {
+                        "awsvpcConfiguration": {
+                            "subnets": ["sub1", "sub2"],
+                            "securityGroups": ["sg1", "sg2"],
+                            "assignPublicIp": "DISABLED",
+                        }
+                    },
+                },
+            ),
+            # Both run_task_kwargs and executor_config provided. The latter 
should override the former,
+            # following a recursive python dict update strategy
+            (
+                {
+                    "startedBy": "Banana",
+                    "tags": [{"key": "FOO", "value": "BAR"}],
+                    "taskDefinition": "foobar",
+                    "overrides": {
+                        "containerOverrides": [
+                            {
+                                "name": "container-name",
+                                "memory": 500,
+                                "cpu": 10,
+                                "environment": [{"name": "X", "value": "Y"}],
+                            }
+                        ]
+                    },
+                },
+                {
+                    "startedBy": "Fish",
+                    "tags": [{"key": "X", "value": "Y"}, {"key": "W", "value": 
"Z"}],
+                    "overrides": {
+                        "containerOverrides": [
+                            {
+                                "name": "container-name",
+                                "memory": 300,
+                                "environment": [{"name": "W", "value": "Z"}],
+                            }
+                        ]
+                    },
+                },
+                {
+                    # tags and startedBy are overridden by exec_config
+                    "startedBy": "Fish",
+                    # List types overwrite entirely, as python dict update 
would do
+                    "tags": [{"key": "X", "value": "Y"}, {"key": "W", "value": 
"Z"}],
+                    # taskDefinition remains since it is not a list type and 
not overridden by exec config
+                    "taskDefinition": "foobar",
+                    "launchType": "FARGATE",
+                    "cluster": "some-cluster",
+                    "platformVersion": "LATEST",
+                    "count": 1,
+                    "overrides": {
+                        "containerOverrides": [
+                            {
+                                "memory": 300,
+                                # cpu is not present because it was missing 
from the container overrides in
+                                # the exec_config
+                                "command": ["command"],
+                                "name": "container-name",
+                                "environment": [
+                                    # Overridden list type
+                                    {"name": "W", "value": "Z"},  # Only new 
env vars present, overwritten
+                                    # Added by the ecs executor
+                                    {"name": "AIRFLOW_IS_EXECUTOR_CONTAINER", 
"value": "true"},
+                                ],
+                            }
+                        ]
+                    },
+                    # Added by the ecs executor
+                    "networkConfiguration": {
+                        "awsvpcConfiguration": {
+                            "subnets": ["sub1", "sub2"],
+                            "securityGroups": ["sg1", "sg2"],
+                            "assignPublicIp": "DISABLED",
+                        }
+                    },
+                },
+            ),
+        ],
+    )
+    def test_run_task_kwargs_exec_config_overrides(
+        self, set_env_vars, run_task_kwargs, exec_config, expected_result
+    ):
+        run_task_kwargs_env_key = 
f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.RUN_TASK_KWARGS}".upper()
+        os.environ[run_task_kwargs_env_key] = json.dumps(run_task_kwargs)
+
+        mock_ti_key = mock.Mock(spec=TaskInstanceKey)
+        command = ["command"]
+
+        executor = AwsEcsExecutor()
+
+        final_run_task_kwargs = executor._run_task_kwargs(mock_ti_key, 
command, "queue", exec_config)
+
+        assert final_run_task_kwargs == expected_result

Reply via email to