This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 92d1e8c447 Move ECS Executor to its own file (#35418)
92d1e8c447 is described below

commit 92d1e8c447b682dd1e4ecefbe06fe0e335479d0b
Author: Syed Hussain <[email protected]>
AuthorDate: Fri Nov 3 13:38:32 2023 -0700

    Move ECS Executor to its own file (#35418)
    
    * Move ECS Executor from __init__.py to its own file. This improves the 
logging because logs record the filename, and __init__.py was not a helpful 
name.
    
    * Fix failing tests
---
 .../providers/amazon/aws/executors/ecs/__init__.py | 326 ---------------------
 .../executors/ecs/{__init__.py => ecs_executor.py} |   0
 .../executors/ecs-executor.rst                     |   2 +-
 .../amazon/aws/executors/ecs/test_ecs_executor.py  |   6 +-
 tests/providers/amazon/aws/hooks/test_base_aws.py  |   2 +-
 5 files changed, 5 insertions(+), 331 deletions(-)

diff --git a/airflow/providers/amazon/aws/executors/ecs/__init__.py 
b/airflow/providers/amazon/aws/executors/ecs/__init__.py
index 0a63aea963..13a83393a9 100644
--- a/airflow/providers/amazon/aws/executors/ecs/__init__.py
+++ b/airflow/providers/amazon/aws/executors/ecs/__init__.py
@@ -14,329 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-"""
-AWS ECS Executor.
-
-Each Airflow task gets delegated out to an Amazon ECS Task.
-"""
-
-from __future__ import annotations
-
-import time
-from collections import defaultdict, deque
-from copy import deepcopy
-from typing import TYPE_CHECKING
-
-from airflow.configuration import conf
-from airflow.executors.base_executor import BaseExecutor
-from airflow.providers.amazon.aws.executors.ecs.boto_schema import 
BotoDescribeTasksSchema, BotoRunTaskSchema
-from airflow.providers.amazon.aws.executors.ecs.utils import (
-    CONFIG_DEFAULTS,
-    CONFIG_GROUP_NAME,
-    AllEcsConfigKeys,
-    EcsExecutorException,
-    EcsQueuedTask,
-    EcsTaskCollection,
-)
-from airflow.utils.state import State
-
-if TYPE_CHECKING:
-    from airflow.models.taskinstance import TaskInstanceKey
-    from airflow.providers.amazon.aws.executors.ecs.utils import (
-        CommandType,
-        ExecutorConfigType,
-    )
-
-
-class AwsEcsExecutor(BaseExecutor):
-    """
-    Executes the provided Airflow command on an ECS instance.
-
-    The Airflow Scheduler creates a shell command, and passes it to the 
executor. This ECS Executor
-    runs said Airflow command on a remote Amazon ECS Cluster with a 
task-definition configured to
-    launch the same containers as the Scheduler. It then periodically checks 
in with the launched
-    tasks (via task ARNs) to determine the status.
-
-    This allows individual tasks to specify CPU, memory, GPU, env variables, 
etc. When initializing a task,
-    there's an option for "executor config" which should be a dictionary with 
keys that match the
-    ``ContainerOverride`` definition per AWS documentation (see link below).
-
-    Prerequisite: proper configuration of Boto3 library
-    .. seealso:: 
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
 for
-    authentication and access-key management. You can store an environmental 
variable, setup aws config from
-    console, or use IAM roles.
-
-    .. seealso:: 
https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_ContainerOverride.html
 for an
-     Airflow TaskInstance's executor_config.
-    """
-
-    # Maximum number of retries to run an ECS task.
-    MAX_RUN_TASK_ATTEMPTS = conf.get(
-        CONFIG_GROUP_NAME,
-        AllEcsConfigKeys.MAX_RUN_TASK_ATTEMPTS,
-        fallback=CONFIG_DEFAULTS[AllEcsConfigKeys.MAX_RUN_TASK_ATTEMPTS],
-    )
-
-    # AWS limits the maximum number of ARNs in the describe_tasks function.
-    DESCRIBE_TASKS_BATCH_SIZE = 99
-
-    def __init__(self, *args, **kwargs):
-        super().__init__(*args, **kwargs)
-        self.active_workers: EcsTaskCollection = EcsTaskCollection()
-        self.pending_tasks: deque = deque()
-
-        self.cluster = conf.get(CONFIG_GROUP_NAME, AllEcsConfigKeys.CLUSTER)
-        self.container_name = conf.get(CONFIG_GROUP_NAME, 
AllEcsConfigKeys.CONTAINER_NAME)
-        aws_conn_id = conf.get(
-            CONFIG_GROUP_NAME,
-            AllEcsConfigKeys.AWS_CONN_ID,
-            fallback=CONFIG_DEFAULTS[AllEcsConfigKeys.AWS_CONN_ID],
-        )
-        region_name = conf.get(CONFIG_GROUP_NAME, AllEcsConfigKeys.REGION_NAME)
-        from airflow.providers.amazon.aws.hooks.ecs import EcsHook
-
-        self.ecs = EcsHook(aws_conn_id=aws_conn_id, 
region_name=region_name).conn
-        self.run_task_kwargs = self._load_run_kwargs()
-
-    def sync(self):
-        try:
-            self.sync_running_tasks()
-            self.attempt_task_runs()
-        except Exception:
-            # We catch any and all exceptions because otherwise they would 
bubble
-            # up and kill the scheduler process
-            self.log.exception("Failed to sync %s", self.__class__.__name__)
-
-    def sync_running_tasks(self):
-        """Checks and update state on all running tasks."""
-        all_task_arns = self.active_workers.get_all_arns()
-        if not all_task_arns:
-            self.log.debug("No active Airflow tasks, skipping sync.")
-            return
-
-        describe_tasks_response = self.__describe_tasks(all_task_arns)
-        self.log.debug("Active Workers: %s", describe_tasks_response)
-
-        if describe_tasks_response["failures"]:
-            for failure in describe_tasks_response["failures"]:
-                self.__handle_failed_task(failure["arn"], failure["reason"])
-
-        updated_tasks = describe_tasks_response["tasks"]
-        for task in updated_tasks:
-            self.__update_running_task(task)
-
-    def __update_running_task(self, task):
-        self.active_workers.update_task(task)
-        # Get state of current task.
-        task_state = task.get_task_state()
-        task_key = self.active_workers.arn_to_key[task.task_arn]
-        # Mark finished tasks as either a success/failure.
-        if task_state == State.FAILED:
-            self.fail(task_key)
-        elif task_state == State.SUCCESS:
-            self.success(task_key)
-        elif task_state == State.REMOVED:
-            self.__handle_failed_task(task.task_arn, task.stopped_reason)
-        if task_state in (State.FAILED, State.SUCCESS):
-            self.log.debug(
-                "Airflow task %s marked as %s after running on %s",
-                task_key,
-                task_state,
-                task.task_arn,
-            )
-            self.active_workers.pop_by_key(task_key)
-
-    def __describe_tasks(self, task_arns):
-        all_task_descriptions = {"tasks": [], "failures": []}
-        for i in range(0, len(task_arns), self.DESCRIBE_TASKS_BATCH_SIZE):
-            batched_task_arns = task_arns[i : i + 
self.DESCRIBE_TASKS_BATCH_SIZE]
-            if not batched_task_arns:
-                continue
-            boto_describe_tasks = 
self.ecs.describe_tasks(tasks=batched_task_arns, cluster=self.cluster)
-            describe_tasks_response = 
BotoDescribeTasksSchema().load(boto_describe_tasks)
-
-            
all_task_descriptions["tasks"].extend(describe_tasks_response["tasks"])
-            
all_task_descriptions["failures"].extend(describe_tasks_response["failures"])
-        return all_task_descriptions
-
-    def __handle_failed_task(self, task_arn: str, reason: str):
-        """If an API failure occurs, the task is rescheduled."""
-        task_key = self.active_workers.arn_to_key[task_arn]
-        task_info = self.active_workers.info_by_key(task_key)
-        task_cmd = task_info.cmd
-        queue = task_info.queue
-        exec_info = task_info.config
-        failure_count = self.active_workers.failure_count_by_key(task_key)
-        if int(failure_count) < int(self.__class__.MAX_RUN_TASK_ATTEMPTS):
-            self.log.warning(
-                "Airflow task %s failed due to %s. Failure %s out of %s 
occurred on %s. Rescheduling.",
-                task_key,
-                reason,
-                failure_count,
-                self.__class__.MAX_RUN_TASK_ATTEMPTS,
-                task_arn,
-            )
-            self.active_workers.increment_failure_count(task_key)
-            self.pending_tasks.appendleft(
-                EcsQueuedTask(task_key, task_cmd, queue, exec_info, 
failure_count + 1)
-            )
-        else:
-            self.log.error(
-                "Airflow task %s has failed a maximum of %s times. Marking as 
failed",
-                task_key,
-                failure_count,
-            )
-            self.active_workers.pop_by_key(task_key)
-            self.fail(task_key)
-
-    def attempt_task_runs(self):
-        """
-        Takes tasks from the pending_tasks queue, and attempts to find an 
instance to run it on.
-
-        If the launch type is EC2, this will attempt to place tasks on empty 
EC2 instances.  If
-            there are no EC2 instances available, no task is placed and this 
function will be
-            called again in the next heart-beat.
-
-        If the launch type is FARGATE, this will run the tasks on new AWS 
Fargate instances.
-        """
-        queue_len = len(self.pending_tasks)
-        failure_reasons = defaultdict(int)
-        for _ in range(queue_len):
-            ecs_task = self.pending_tasks.popleft()
-            task_key = ecs_task.key
-            cmd = ecs_task.command
-            queue = ecs_task.queue
-            exec_config = ecs_task.executor_config
-            attempt_number = ecs_task.attempt_number
-            _failure_reasons = []
-            try:
-                run_task_response = self._run_task(task_key, cmd, queue, 
exec_config)
-            except Exception as e:
-                # Failed to even get a response back from the Boto3 API or 
something else went
-                # wrong.  For any possible failure we want to add the 
exception reasons to the
-                # failure list so that it is logged to the user and most 
importantly the task is
-                # added back to the pending list to be retried later.
-                _failure_reasons.append(str(e))
-            else:
-                # We got a response back, check if there were failures. If so, 
add them to the
-                # failures list so that it is logged to the user and most 
importantly the task
-                # is added back to the pending list to be retried later.
-                if run_task_response["failures"]:
-                    _failure_reasons.extend([f["reason"] for f in 
run_task_response["failures"]])
-
-            if _failure_reasons:
-                for reason in _failure_reasons:
-                    failure_reasons[reason] += 1
-                # Make sure the number of attempts does not exceed 
MAX_RUN_TASK_ATTEMPTS
-                if int(attempt_number) <= 
int(self.__class__.MAX_RUN_TASK_ATTEMPTS):
-                    ecs_task.attempt_number += 1
-                    self.pending_tasks.appendleft(ecs_task)
-                else:
-                    self.log.error(
-                        "ECS task %s has failed a maximum of %s times. Marking 
as failed",
-                        task_key,
-                        attempt_number,
-                    )
-                    self.fail(task_key)
-            elif not run_task_response["tasks"]:
-                self.log.error("ECS RunTask Response: %s", run_task_response)
-                raise EcsExecutorException(
-                    "No failures and no ECS tasks provided in response. This 
should never happen."
-                )
-            else:
-                task = run_task_response["tasks"][0]
-                self.active_workers.add_task(task, task_key, queue, cmd, 
exec_config, attempt_number)
-        if failure_reasons:
-            self.log.error(
-                "Pending ECS tasks failed to launch for the following reasons: 
%s. Retrying later.",
-                dict(failure_reasons),
-            )
-
-    def _run_task(
-        self, task_id: TaskInstanceKey, cmd: CommandType, queue: str, 
exec_config: ExecutorConfigType
-    ):
-        """
-        Run a queued-up Airflow task.
-
-        Not to be confused with execute_async() which inserts tasks into the 
queue.
-        The command and executor config will be placed in the 
container-override
-        section of the JSON request before calling Boto3's "run_task" function.
-        """
-        run_task_api = self._run_task_kwargs(task_id, cmd, queue, exec_config)
-        boto_run_task = self.ecs.run_task(**run_task_api)
-        run_task_response = BotoRunTaskSchema().load(boto_run_task)
-        return run_task_response
-
-    def _run_task_kwargs(
-        self, task_id: TaskInstanceKey, cmd: CommandType, queue: str, 
exec_config: ExecutorConfigType
-    ) -> dict:
-        """
-        Overrides the Airflow command to update the container overrides so 
kwargs are specific to this task.
-
-        One last chance to modify Boto3's "run_task" kwarg params before it 
gets passed into the Boto3 client.
-        """
-        run_task_api = deepcopy(self.run_task_kwargs)
-        container_override = 
self.get_container(run_task_api["overrides"]["containerOverrides"])
-        container_override["command"] = cmd
-        container_override.update(exec_config)
-
-        # Inject the env variable to configure logging for containerized 
execution environment
-        if "environment" not in container_override:
-            container_override["environment"] = []
-        container_override["environment"].append({"name": 
"AIRFLOW_IS_EXECUTOR_CONTAINER", "value": "true"})
-
-        return run_task_api
-
-    def execute_async(self, key: TaskInstanceKey, command: CommandType, 
queue=None, executor_config=None):
-        """Save the task to be executed in the next sync by inserting the 
commands into a queue."""
-        if executor_config and ("name" in executor_config or "command" in 
executor_config):
-            raise ValueError('Executor Config should never override "name" or 
"command"')
-        self.pending_tasks.append(EcsQueuedTask(key, command, queue, 
executor_config or {}, 1))
-
-    def end(self, heartbeat_interval=10):
-        """Waits for all currently running tasks to end, and doesn't launch 
any tasks."""
-        try:
-            while True:
-                self.sync()
-                if not self.active_workers:
-                    break
-                time.sleep(heartbeat_interval)
-        except Exception:
-            # We catch any and all exceptions because otherwise they would 
bubble
-            # up and kill the scheduler process.
-            self.log.exception("Failed to end %s", self.__class__.__name__)
-
-    def terminate(self):
-        """Kill all ECS processes by calling Boto3's StopTask API."""
-        try:
-            for arn in self.active_workers.get_all_arns():
-                self.ecs.stop_task(
-                    cluster=self.cluster, task=arn, reason="Airflow Executor 
received a SIGTERM"
-                )
-            self.end()
-        except Exception:
-            # We catch any and all exceptions because otherwise they would 
bubble
-            # up and kill the scheduler process.
-            self.log.exception("Failed to terminate %s", 
self.__class__.__name__)
-
-    def _load_run_kwargs(self) -> dict:
-        from airflow.providers.amazon.aws.executors.ecs.ecs_executor_config 
import build_task_kwargs
-
-        ecs_executor_run_task_kwargs = build_task_kwargs()
-
-        try:
-            
self.get_container(ecs_executor_run_task_kwargs["overrides"]["containerOverrides"])["command"]
-        except KeyError:
-            raise KeyError(
-                "Rendered JSON template does not contain key "
-                '"overrides[containerOverrides][containers][x][command]"'
-            )
-        return ecs_executor_run_task_kwargs
-
-    def get_container(self, container_list):
-        """Searches task list for core Airflow container."""
-        for container in container_list:
-            if container["name"] == self.container_name:
-                return container
-        raise KeyError(f"No such container found by container name: 
{self.container_name}")
diff --git a/airflow/providers/amazon/aws/executors/ecs/__init__.py 
b/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
similarity index 100%
copy from airflow/providers/amazon/aws/executors/ecs/__init__.py
copy to airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
diff --git a/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst 
b/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst
index b09e6afff0..a50e30720c 100644
--- a/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst
+++ b/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst
@@ -557,7 +557,7 @@ To configure Airflow to utilize the ECS Executor and 
leverage the resources we'v
 
 .. code-block:: bash
 
-   export 
AIRFLOW**CORE**EXECUTOR='airflow.providers.amazon.aws.executors.ecs.AwsEcsExecutor'
+   export 
AIRFLOW**CORE**EXECUTOR='airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor'
 
    export AIRFLOW**DATABASE**SQL*ALCHEMY*CONN=<postgres-connection-string>
 
diff --git a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py 
b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
index 8333de36a8..e749ed6bba 100644
--- a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
+++ b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
@@ -30,14 +30,14 @@ import yaml
 from inflection import camelize
 
 from airflow.executors.base_executor import BaseExecutor
-from airflow.providers.amazon.aws.executors.ecs import (
+from airflow.providers.amazon.aws.executors.ecs import ecs_executor_config
+from airflow.providers.amazon.aws.executors.ecs.boto_schema import 
BotoTaskSchema
+from airflow.providers.amazon.aws.executors.ecs.ecs_executor import (
     CONFIG_GROUP_NAME,
     AllEcsConfigKeys,
     AwsEcsExecutor,
     EcsTaskCollection,
-    ecs_executor_config,
 )
-from airflow.providers.amazon.aws.executors.ecs.boto_schema import 
BotoTaskSchema
 from airflow.providers.amazon.aws.executors.ecs.utils import (
     CONFIG_DEFAULTS,
     EcsExecutorTask,
diff --git a/tests/providers/amazon/aws/hooks/test_base_aws.py 
b/tests/providers/amazon/aws/hooks/test_base_aws.py
index a6395f4667..2ea511c45d 100644
--- a/tests/providers/amazon/aws/hooks/test_base_aws.py
+++ b/tests/providers/amazon/aws/hooks/test_base_aws.py
@@ -41,7 +41,7 @@ from moto.core import DEFAULT_ACCOUNT_ID
 
 from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.models.connection import Connection
-from airflow.providers.amazon.aws.executors.ecs import AwsEcsExecutor
+from airflow.providers.amazon.aws.executors.ecs.ecs_executor import 
AwsEcsExecutor
 from airflow.providers.amazon.aws.hooks.base_aws import (
     AwsBaseHook,
     AwsGenericHook,

Reply via email to