Taragolis commented on code in PR #34381:
URL: https://github.com/apache/airflow/pull/34381#discussion_r1328033206


##########
airflow/providers/amazon/aws/executors/ecs/ecs_executor_config.py:
##########
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+AWS ECS Executor configuration.
+
+This is the configuration for calling the ECS ``run_task`` function. The AWS 
ECS Executor calls
+Boto3's ``run_task(**kwargs)`` function with the kwargs templated by this 
dictionary. See the URL
+below for documentation on the parameters accepted by the Boto3 run_task 
function.
+
+.. seealso::
+    
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.run_task
+
+"""
+
+from __future__ import annotations
+
+import json
+from copy import deepcopy
+from json import JSONDecodeError
+
+from airflow.configuration import conf
+from airflow.providers.amazon.aws.executors.ecs.utils import (
+    CONFIG_GROUP_NAME,
+    RUN_TASK_KWARG_DEFAULTS,
+    AllEcsConfigKeys,
+    RunTaskKwargsConfigKeys,
+    convert_dict_keys_camel_case,
+    parse_assign_public_ip,
+)
+from airflow.utils.helpers import prune_dict
+
+
+def _fetch_templated_kwargs() -> dict[str, str]:
+    run_task_kwargs_value = conf.get(CONFIG_GROUP_NAME, 
AllEcsConfigKeys.RUN_TASK_KWARGS, fallback=dict())
+    return json.loads(str(run_task_kwargs_value))
+
+
+def _fetch_explicit_kwargs() -> dict[str, str]:
+    return prune_dict(
+        {key: conf.get(CONFIG_GROUP_NAME, key, fallback=None) for key in 
RunTaskKwargsConfigKeys()}
+    )
+
+
+def build_task_kwargs() -> dict:
+    # This will put some kwargs at the root of the dictionary that do NOT 
belong there. However,
+    # the code below expects them to be there and will rearrange them as 
necessary.
+    task_kwargs = deepcopy(RUN_TASK_KWARG_DEFAULTS)
+    task_kwargs.update(_fetch_templated_kwargs())
+    task_kwargs.update(_fetch_explicit_kwargs())
+
+    # There can only be 1 count of these containers
+    task_kwargs["count"] = 1  # type: ignore
+    # There could be a generic approach to the below, but likely more 
convoluted then just manually ensuring
+    # the one nested config we need to update is present. If we need to 
override more options in the future we
+    # should revisit this.
+    if "overrides" not in task_kwargs:
+        task_kwargs["overrides"] = {}  # type: ignore
+    if "containerOverrides" not in task_kwargs["overrides"]:
+        task_kwargs["overrides"]["containerOverrides"] = [{}]  # type: ignore
+    task_kwargs["overrides"]["containerOverrides"][0]["name"] = 
task_kwargs.pop(  # type: ignore
+        AllEcsConfigKeys.CONTAINER_NAME
+    )
+    # The executor will overwrite the 'command' property during execution. 
Must always be the first container!
+    task_kwargs["overrides"]["containerOverrides"][0]["command"] = []  # type: 
ignore
+
+    if any(
+        [
+            subnets := task_kwargs.pop(AllEcsConfigKeys.SUBNETS, None),
+            security_groups := 
task_kwargs.pop(AllEcsConfigKeys.SECURITY_GROUPS, None),
+            # Surrounding parens are for the walrus operator to function 
correctly along with the None check
+            (assign_public_ip := 
task_kwargs.pop(AllEcsConfigKeys.ASSIGN_PUBLIC_IP, None)) is not None,
+        ]
+    ):
+        network_config = prune_dict(
+            {
+                "awsvpcConfiguration": {
+                    "subnets": str(subnets).split(",") if subnets else None,
+                    "securityGroups": str(security_groups).split(",") if 
security_groups else None,
+                    "assignPublicIp": parse_assign_public_ip(assign_public_ip),
+                }
+            }
+        )
+
+        if "subnets" not in network_config["awsvpcConfiguration"]:
+            raise ValueError("At least one subnet is required to run a task.")
+
+        task_kwargs["networkConfiguration"] = network_config
+
+    task_kwargs = convert_dict_keys_camel_case(task_kwargs)
+
+    try:
+        json.loads(json.dumps(task_kwargs))
+    except JSONDecodeError:
+        raise ValueError(f"AWS ECS Executor config values must be JSON 
serializable. Got {task_kwargs}")

Review Comment:
   overrides/containerOverrides might contain some sensitive information, so we 
should not expose it in logs
   ```suggestion
           raise ValueError(f"AWS ECS Executor config values must be JSON 
serializable.")
   ```



##########
airflow/providers/amazon/aws/executors/ecs/__init__.py:
##########
@@ -0,0 +1,335 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+AWS ECS Executor.
+
+Each Airflow task gets delegated out to an Amazon ECS Task.
+"""
+
+from __future__ import annotations
+
+import time
+from collections import defaultdict, deque
+from copy import deepcopy
+from typing import TYPE_CHECKING
+
+from airflow.configuration import conf
+from airflow.executors.base_executor import BaseExecutor
+from airflow.providers.amazon.aws.executors.ecs.boto_schema import 
BotoDescribeTasksSchema, BotoRunTaskSchema
+from airflow.providers.amazon.aws.executors.ecs.utils import (
+    CONFIG_DEFAULTS,
+    CONFIG_GROUP_NAME,
+    AllEcsConfigKeys,
+    EcsExecutorException,
+    EcsQueuedTask,
+    EcsTaskCollection,
+)
+from airflow.utils.state import State
+
+if TYPE_CHECKING:
+    from airflow.models.taskinstance import TaskInstanceKey
+    from airflow.providers.amazon.aws.executors.ecs.utils import (
+        CommandType,
+        ExecutorConfigType,
+    )
+
+
+class AwsEcsExecutor(BaseExecutor):
+    """
+    Executes the provided Airflow command on an ECS instance.
+
+    The Airflow Scheduler creates a shell command, and passes it to the 
executor. This ECS Executor
+    runs said Airflow command on a remote Amazon ECS Cluster with a 
task-definition configured to
+    launch the same containers as the Scheduler. It then periodically checks 
in with the launched
+    tasks (via task ARNs) to determine the status.
+
+    This allows individual tasks to specify CPU, memory, GPU, env variables, 
etc. When initializing a task,
+    there's an option for "executor config" which should be a dictionary with 
keys that match the
+    ``ContainerOverride`` definition per AWS documentation (see link below).
+
+    Prerequisite: proper configuration of Boto3 library
+    .. seealso:: 
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
 for
+    authentication and access-key management. You can store an environmental 
variable, setup aws config from
+    console, or use IAM roles.
+
+    .. seealso:: 
https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_ContainerOverride.html
 for an
+     Airflow TaskInstance's executor_config.
+    """
+
+    # Maximum number of retries to run an ECS task.
+    MAX_RUN_TASK_ATTEMPTS = conf.get(
+        CONFIG_GROUP_NAME,
+        AllEcsConfigKeys.MAX_RUN_TASK_ATTEMPTS,
+        fallback=CONFIG_DEFAULTS[AllEcsConfigKeys.MAX_RUN_TASK_ATTEMPTS],
+    )
+
+    # AWS limits the maximum number of ARNs in the describe_tasks function.
+    DESCRIBE_TASKS_BATCH_SIZE = 99
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.active_workers: EcsTaskCollection = EcsTaskCollection()
+        self.pending_tasks: deque = deque()
+
+        self.cluster = conf.get(CONFIG_GROUP_NAME, AllEcsConfigKeys.CLUSTER)
+        self.container_name = conf.get(CONFIG_GROUP_NAME, 
AllEcsConfigKeys.CONTAINER_NAME)
+        aws_conn_id = conf.get(
+            CONFIG_GROUP_NAME,
+            AllEcsConfigKeys.AWS_CONN_ID,
+            fallback=CONFIG_DEFAULTS[AllEcsConfigKeys.AWS_CONN_ID],
+        )
+        region = conf.get(CONFIG_GROUP_NAME, AllEcsConfigKeys.REGION)
+        from airflow.providers.amazon.aws.hooks.ecs import EcsHook
+
+        self.ecs = EcsHook(aws_conn_id=aws_conn_id, region_name=region).conn

Review Comment:
   ```suggestion
           self.ecs = EcsHook(aws_conn_id=aws_conn_id, 
region_name=region_name).conn
   ```



##########
airflow/providers/amazon/aws/config_templates/config.yml:
##########
@@ -0,0 +1,131 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+
+aws_ecs_executor:
+  description: |
+    This section only applies if you are using the AwsEcsExecutor in
+    Airflow's ``[core]`` configuration.
+    For more information on any of these execution parameters, see the link 
below:
+    
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs/client/run_task.html
+    For boto3 credential management, see
+    
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
+  options:
+    conn_id:
+      description: |
+        The Airflow connection (i.e. credentials) used by the ECS executor to 
make API calls to AWS ECS.
+      version_added: "2.8"
+      type: string
+      example: "aws_default"
+      default: "aws_default"
+    region:

Review Comment:
   ```suggestion
       region_name:
   ```
   
   Let's make it more consistent to boto3, Hooks, and most of the 
Operators/Sensors



##########
airflow/providers/amazon/aws/executors/ecs/__init__.py:
##########
@@ -0,0 +1,335 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+AWS ECS Executor.
+
+Each Airflow task gets delegated out to an Amazon ECS Task.
+"""
+
+from __future__ import annotations
+
+import time
+from collections import defaultdict, deque
+from copy import deepcopy
+from typing import TYPE_CHECKING
+
+from airflow.configuration import conf
+from airflow.executors.base_executor import BaseExecutor
+from airflow.providers.amazon.aws.executors.ecs.boto_schema import 
BotoDescribeTasksSchema, BotoRunTaskSchema
+from airflow.providers.amazon.aws.executors.ecs.utils import (
+    CONFIG_DEFAULTS,
+    CONFIG_GROUP_NAME,
+    AllEcsConfigKeys,
+    EcsExecutorException,
+    EcsQueuedTask,
+    EcsTaskCollection,
+)
+from airflow.utils.state import State
+
+if TYPE_CHECKING:
+    from airflow.models.taskinstance import TaskInstanceKey
+    from airflow.providers.amazon.aws.executors.ecs.utils import (
+        CommandType,
+        ExecutorConfigType,
+    )
+
+
+class AwsEcsExecutor(BaseExecutor):
+    """
+    Executes the provided Airflow command on an ECS instance.
+
+    The Airflow Scheduler creates a shell command, and passes it to the 
executor. This ECS Executor
+    runs said Airflow command on a remote Amazon ECS Cluster with a 
task-definition configured to
+    launch the same containers as the Scheduler. It then periodically checks 
in with the launched
+    tasks (via task ARNs) to determine the status.
+
+    This allows individual tasks to specify CPU, memory, GPU, env variables, 
etc. When initializing a task,
+    there's an option for "executor config" which should be a dictionary with 
keys that match the
+    ``ContainerOverride`` definition per AWS documentation (see link below).
+
+    Prerequisite: proper configuration of Boto3 library
+    .. seealso:: 
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
 for
+    authentication and access-key management. You can store an environmental 
variable, setup aws config from
+    console, or use IAM roles.
+
+    .. seealso:: 
https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_ContainerOverride.html
 for an
+     Airflow TaskInstance's executor_config.
+    """
+
+    # Maximum number of retries to run an ECS task.
+    MAX_RUN_TASK_ATTEMPTS = conf.get(
+        CONFIG_GROUP_NAME,
+        AllEcsConfigKeys.MAX_RUN_TASK_ATTEMPTS,
+        fallback=CONFIG_DEFAULTS[AllEcsConfigKeys.MAX_RUN_TASK_ATTEMPTS],
+    )
+
+    # AWS limits the maximum number of ARNs in the describe_tasks function.
+    DESCRIBE_TASKS_BATCH_SIZE = 99
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.active_workers: EcsTaskCollection = EcsTaskCollection()
+        self.pending_tasks: deque = deque()
+
+        self.cluster = conf.get(CONFIG_GROUP_NAME, AllEcsConfigKeys.CLUSTER)
+        self.container_name = conf.get(CONFIG_GROUP_NAME, 
AllEcsConfigKeys.CONTAINER_NAME)
+        aws_conn_id = conf.get(
+            CONFIG_GROUP_NAME,
+            AllEcsConfigKeys.AWS_CONN_ID,
+            fallback=CONFIG_DEFAULTS[AllEcsConfigKeys.AWS_CONN_ID],
+        )
+        region = conf.get(CONFIG_GROUP_NAME, AllEcsConfigKeys.REGION)

Review Comment:
   ```suggestion
           region_name = conf.get(CONFIG_GROUP_NAME, 
AllEcsConfigKeys.REGION_NAME)
   ```



##########
airflow/providers/amazon/aws/executors/ecs/utils.py:
##########
@@ -0,0 +1,272 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+AWS ECS Executor Utilities.
+
+Data classes and utility functions used by the ECS executor.
+"""
+
+from __future__ import annotations
+
+from collections import defaultdict
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Callable, Dict, List
+
+from airflow.utils.state import State
+
+if TYPE_CHECKING:
+    from airflow.models.taskinstance import TaskInstanceKey
+
+CommandType = List[str]
+ExecutorConfigFunctionType = Callable[[CommandType], dict]
+ExecutorConfigType = Dict[str, Any]
+
+CONFIG_GROUP_NAME = "aws_ecs_executor"
+
+RUN_TASK_KWARG_DEFAULTS = {
+    "assign_public_ip": "False",
+    "launch_type": "FARGATE",
+    "platform_version": "LATEST",
+}
+
+CONFIG_DEFAULTS = {
+    "conn_id": "aws_default",
+    "max_run_task_attempts": "3",
+    **RUN_TASK_KWARG_DEFAULTS,
+}
+
+
+@dataclass
+class EcsQueuedTask:
+    """Represents an ECS task that is queued. The task will be run in the next 
heartbeat."""
+
+    key: TaskInstanceKey
+    command: CommandType
+    queue: str
+    executor_config: ExecutorConfigType
+    attempt_number: int
+
+
+@dataclass
+class EcsTaskInfo:
+    """Contains information about a currently running ECS task."""
+
+    cmd: CommandType
+    queue: str
+    config: ExecutorConfigType
+
+
+class BaseConfigKeys:
+    """Base Implementation of the Config Keys class. Implements iteration for 
child classes to inherit."""
+
+    def __iter__(self):
+        return iter({value for (key, value) in self.__class__.__dict__.items() 
if not key.startswith("__")})
+
+
+class RunTaskKwargsConfigKeys(BaseConfigKeys):
+    """Keys loaded into the config which are valid ECS run_task kwargs."""
+
+    ASSIGN_PUBLIC_IP = "assign_public_ip"
+    CLUSTER = "cluster"
+    LAUNCH_TYPE = "launch_type"
+    PLATFORM_VERSION = "platform_version"
+    SECURITY_GROUPS = "security_groups"
+    SUBNETS = "subnets"
+    TASK_DEFINITION = "task_definition"
+    CONTAINER_NAME = "container_name"
+    MAX_RUN_TASK_ATTEMPTS = "max_run_task_attempts"
+
+
+class AllEcsConfigKeys(RunTaskKwargsConfigKeys):
+    """All keys loaded into the config which are related to the ECS 
Executor."""
+
+    AWS_CONN_ID = "conn_id"
+    RUN_TASK_KWARGS = "run_task_kwargs"
+    REGION = "region"

Review Comment:
   ```suggestion
       REGION_NAME = "region_name"
   ```



##########
airflow/providers/amazon/aws/config_templates/config.yml:
##########
@@ -0,0 +1,131 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+
+aws_ecs_executor:
+  description: |
+    This section only applies if you are using the AwsEcsExecutor in
+    Airflow's ``[core]`` configuration.
+    For more information on any of these execution parameters, see the link 
below:
+    
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs/client/run_task.html
+    For boto3 credential management, see
+    
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
+  options:
+    conn_id:
+      description: |
+        The Airflow connection (i.e. credentials) used by the ECS executor to 
make API calls to AWS ECS.
+      version_added: "2.8"
+      type: string
+      example: "aws_default"
+      default: "aws_default"
+    region:
+      description: |
+        The name of the AWS Region where Amazon ECS is configured. Required.

Review Comment:
   Is it really required? User might set it in Connection or as `AWS_REGION` / 
`AWS_DEFAULT_REGION` environment variables ?



##########
airflow/providers/amazon/aws/executors/ecs/utils.py:
##########
@@ -0,0 +1,272 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+AWS ECS Executor Utilities.
+
+Data classes and utility functions used by the ECS executor.
+"""
+
+from __future__ import annotations
+
+from collections import defaultdict
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any, Callable, Dict, List
+
+from airflow.utils.state import State
+
+if TYPE_CHECKING:
+    from airflow.models.taskinstance import TaskInstanceKey
+
+CommandType = List[str]
+ExecutorConfigFunctionType = Callable[[CommandType], dict]
+ExecutorConfigType = Dict[str, Any]
+
+CONFIG_GROUP_NAME = "aws_ecs_executor"
+
+RUN_TASK_KWARG_DEFAULTS = {
+    "assign_public_ip": "False",
+    "launch_type": "FARGATE",
+    "platform_version": "LATEST",
+}
+
+CONFIG_DEFAULTS = {
+    "conn_id": "aws_default",
+    "max_run_task_attempts": "3",
+    **RUN_TASK_KWARG_DEFAULTS,
+}
+
+
+@dataclass
+class EcsQueuedTask:
+    """Represents an ECS task that is queued. The task will be run in the next 
heartbeat."""
+
+    key: TaskInstanceKey
+    command: CommandType
+    queue: str
+    executor_config: ExecutorConfigType
+    attempt_number: int
+
+
+@dataclass
+class EcsTaskInfo:
+    """Contains information about a currently running ECS task."""
+
+    cmd: CommandType
+    queue: str
+    config: ExecutorConfigType
+
+
+class BaseConfigKeys:
+    """Base Implementation of the Config Keys class. Implements iteration for 
child classes to inherit."""
+
+    def __iter__(self):
+        return iter({value for (key, value) in self.__class__.__dict__.items() 
if not key.startswith("__")})
+
+
+class RunTaskKwargsConfigKeys(BaseConfigKeys):
+    """Keys loaded into the config which are valid ECS run_task kwargs."""
+
+    ASSIGN_PUBLIC_IP = "assign_public_ip"
+    CLUSTER = "cluster"
+    LAUNCH_TYPE = "launch_type"
+    PLATFORM_VERSION = "platform_version"
+    SECURITY_GROUPS = "security_groups"
+    SUBNETS = "subnets"
+    TASK_DEFINITION = "task_definition"
+    CONTAINER_NAME = "container_name"
+    MAX_RUN_TASK_ATTEMPTS = "max_run_task_attempts"
+
+
+class AllEcsConfigKeys(RunTaskKwargsConfigKeys):
+    """All keys loaded into the config which are related to the ECS 
Executor."""
+
+    AWS_CONN_ID = "conn_id"
+    RUN_TASK_KWARGS = "run_task_kwargs"
+    REGION = "region"
+
+
+class EcsExecutorException(Exception):
+    """Thrown when something unexpected has occurred within the ECS 
ecosystem."""
+
+
+class EcsExecutorTask:
+    """Data Transfer Object for an ECS Fargate Task."""
+
+    def __init__(
+        self,
+        task_arn: str,
+        last_status: str,
+        desired_status: str,
+        containers: list[dict[str, Any]],
+        started_at: Any | None = None,
+        stopped_reason: str | None = None,
+    ):
+        self.task_arn = task_arn
+        self.last_status = last_status
+        self.desired_status = desired_status
+        self.containers = containers
+        self.started_at = started_at
+        self.stopped_reason = stopped_reason
+
+    def get_task_state(self) -> str:
+        """
+        This is the primary logic that handles state in an ECS task.
+
+        It will determine if a status is:
+            QUEUED - Task is being provisioned.
+            RUNNING - Task is launched on ECS.
+            REMOVED - Task provisioning has failed for some reason. See 
`stopped_reason`.
+            FAILED - Task is completed and at least one container has failed.
+            SUCCESS - Task is completed and all containers have succeeded.
+        """
+        if self.last_status == "RUNNING":
+            return State.RUNNING
+        elif self.desired_status == "RUNNING":
+            return State.QUEUED
+        is_finished = self.desired_status == "STOPPED"
+        has_exit_codes = all(["exit_code" in x for x in self.containers])
+        # Sometimes ECS tasks may time out.
+        if not self.started_at and is_finished:
+            return State.REMOVED
+        if not is_finished or not has_exit_codes:
+            return State.RUNNING
+        all_containers_succeeded = all([x["exit_code"] == 0 for x in 
self.containers])
+        return State.SUCCESS if all_containers_succeeded else State.FAILED
+
+    def __repr__(self):
+        return f"({self.task_arn}, {self.last_status}->{self.desired_status}, 
{self.get_task_state()})"
+
+
+class EcsTaskCollection:
+    """A five-way dictionary between Airflow task ids, Airflow cmds, ECS ARNs, 
and ECS task objects."""
+
+    def __init__(self):
+        self.key_to_arn: dict[TaskInstanceKey, str] = {}
+        self.arn_to_key: dict[str, TaskInstanceKey] = {}
+        self.tasks: dict[str, EcsExecutorTask] = {}
+        self.key_to_failure_counts: dict[TaskInstanceKey, int] = 
defaultdict(int)
+        self.key_to_task_info: dict[TaskInstanceKey, EcsTaskInfo] = {}
+
+    def add_task(
+        self,
+        task: EcsExecutorTask,
+        airflow_task_key: TaskInstanceKey,
+        queue: str,
+        airflow_cmd: CommandType,
+        exec_config: ExecutorConfigType,
+        attempt_number: int,
+    ):
+        """Adds a task to the collection."""
+        arn = task.task_arn
+        self.tasks[arn] = task
+        self.key_to_arn[airflow_task_key] = arn
+        self.arn_to_key[arn] = airflow_task_key
+        self.key_to_task_info[airflow_task_key] = EcsTaskInfo(airflow_cmd, 
queue, exec_config)
+        self.key_to_failure_counts[airflow_task_key] = attempt_number
+
+    def update_task(self, task: EcsExecutorTask):
+        """Updates the state of the given task based on task ARN."""
+        self.tasks[task.task_arn] = task
+
+    def task_by_key(self, task_key: TaskInstanceKey) -> EcsExecutorTask:
+        """Get a task by Airflow Instance Key."""
+        arn = self.key_to_arn[task_key]
+        return self.task_by_arn(arn)
+
+    def task_by_arn(self, arn) -> EcsExecutorTask:
+        """Get a task by AWS ARN."""
+        return self.tasks[arn]
+
+    def pop_by_key(self, task_key: TaskInstanceKey) -> EcsExecutorTask:
+        """Deletes task from collection based off of Airflow Task Instance 
Key."""
+        arn = self.key_to_arn[task_key]
+        task = self.tasks[arn]
+        del self.key_to_arn[task_key]
+        del self.key_to_task_info[task_key]
+        del self.arn_to_key[arn]
+        del self.tasks[arn]
+        if task_key in self.key_to_failure_counts:
+            del self.key_to_failure_counts[task_key]
+        return task
+
+    def get_all_arns(self) -> list[str]:
+        """Get all AWS ARNs in collection."""
+        return list(self.key_to_arn.values())
+
+    def get_all_task_keys(self) -> list[TaskInstanceKey]:
+        """Get all Airflow Task Keys in collection."""
+        return list(self.key_to_arn.keys())
+
+    def failure_count_by_key(self, task_key: TaskInstanceKey) -> int:
+        """Get the number of times a task has failed given an Airflow Task 
Key."""
+        return self.key_to_failure_counts[task_key]
+
+    def increment_failure_count(self, task_key: TaskInstanceKey):
+        """Increment the failure counter given an Airflow Task Key."""
+        self.key_to_failure_counts[task_key] += 1
+
+    def info_by_key(self, task_key: TaskInstanceKey) -> EcsTaskInfo:
+        """Get the Airflow Command given an Airflow task key."""
+        return self.key_to_task_info[task_key]
+
+    def __getitem__(self, value):
+        """Gets a task by AWS ARN."""
+        return self.task_by_arn(value)
+
+    def __len__(self):
+        """Determines the number of tasks in collection."""
+        return len(self.tasks)
+
+
+def _recursive_flatten_dict(nested_dict):
+    """
+    Recursively unpack a nested dict and return it as a flat dict.
+
+    For example, _flatten_dict({'a': 'a', 'b': 'b', 'c': {'d': 'd'}}) returns 
{'a': 'a', 'b': 'b', 'd': 'd'}.
+    """
+    items = []
+    for key, value in nested_dict.items():
+        if isinstance(value, dict):
+            items.extend(_recursive_flatten_dict(value).items())
+        else:
+            items.append((key, value))
+    return dict(items)
+
+
+def parse_assign_public_ip(assign_public_ip):
+    """Convert "assign_public_ip" from True/False to ENABLE/DISABLE."""
+    return "ENABLED" if assign_public_ip == "True" else "DISABLED"
+
+
+def snake_to_camel(_key):
+    split_key = _key.split("_")
+    first_word = split_key[0]
+    return first_word[0].lower() + first_word[1:] + "".join(word.title() for 
word in split_key[1:])

Review Comment:
   Same as 
[inflection.camelize](https://inflection.readthedocs.io/en/latest/#inflection.camelize).
   We already have this really nice library in constraints: 
https://github.com/apache/airflow/blob/8fa683885f7043ae00a7f7a1cd69202191f0e8d9/constraints-3.8.txt#L371,
 all we need it add as dependency to Amazon Provider and use it instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to