This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 92d1e8c447 Move ECS Executor to its own file (#35418)
92d1e8c447 is described below
commit 92d1e8c447b682dd1e4ecefbe06fe0e335479d0b
Author: Syed Hussain <[email protected]>
AuthorDate: Fri Nov 3 13:38:32 2023 -0700
Move ECS Executor to its own file (#35418)
* Move ECS Executor from __init__.py to its own file. This improves the
logging because logs record the filename, and __init__.py was not a helpful
name.
* Fix failing tests
---
.../providers/amazon/aws/executors/ecs/__init__.py | 326 ---------------------
.../executors/ecs/{__init__.py => ecs_executor.py} | 0
.../executors/ecs-executor.rst | 2 +-
.../amazon/aws/executors/ecs/test_ecs_executor.py | 6 +-
tests/providers/amazon/aws/hooks/test_base_aws.py | 2 +-
5 files changed, 5 insertions(+), 331 deletions(-)
diff --git a/airflow/providers/amazon/aws/executors/ecs/__init__.py
b/airflow/providers/amazon/aws/executors/ecs/__init__.py
index 0a63aea963..13a83393a9 100644
--- a/airflow/providers/amazon/aws/executors/ecs/__init__.py
+++ b/airflow/providers/amazon/aws/executors/ecs/__init__.py
@@ -14,329 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-"""
-AWS ECS Executor.
-
-Each Airflow task gets delegated out to an Amazon ECS Task.
-"""
-
-from __future__ import annotations
-
-import time
-from collections import defaultdict, deque
-from copy import deepcopy
-from typing import TYPE_CHECKING
-
-from airflow.configuration import conf
-from airflow.executors.base_executor import BaseExecutor
-from airflow.providers.amazon.aws.executors.ecs.boto_schema import
BotoDescribeTasksSchema, BotoRunTaskSchema
-from airflow.providers.amazon.aws.executors.ecs.utils import (
- CONFIG_DEFAULTS,
- CONFIG_GROUP_NAME,
- AllEcsConfigKeys,
- EcsExecutorException,
- EcsQueuedTask,
- EcsTaskCollection,
-)
-from airflow.utils.state import State
-
-if TYPE_CHECKING:
- from airflow.models.taskinstance import TaskInstanceKey
- from airflow.providers.amazon.aws.executors.ecs.utils import (
- CommandType,
- ExecutorConfigType,
- )
-
-
-class AwsEcsExecutor(BaseExecutor):
- """
- Executes the provided Airflow command on an ECS instance.
-
- The Airflow Scheduler creates a shell command, and passes it to the
executor. This ECS Executor
- runs said Airflow command on a remote Amazon ECS Cluster with a
task-definition configured to
- launch the same containers as the Scheduler. It then periodically checks
in with the launched
- tasks (via task ARNs) to determine the status.
-
- This allows individual tasks to specify CPU, memory, GPU, env variables,
etc. When initializing a task,
- there's an option for "executor config" which should be a dictionary with
keys that match the
- ``ContainerOverride`` definition per AWS documentation (see link below).
-
- Prerequisite: proper configuration of Boto3 library
- .. seealso::
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
for
- authentication and access-key management. You can store an environmental
variable, setup aws config from
- console, or use IAM roles.
-
- .. seealso::
https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_ContainerOverride.html
for an
- Airflow TaskInstance's executor_config.
- """
-
- # Maximum number of retries to run an ECS task.
- MAX_RUN_TASK_ATTEMPTS = conf.get(
- CONFIG_GROUP_NAME,
- AllEcsConfigKeys.MAX_RUN_TASK_ATTEMPTS,
- fallback=CONFIG_DEFAULTS[AllEcsConfigKeys.MAX_RUN_TASK_ATTEMPTS],
- )
-
- # AWS limits the maximum number of ARNs in the describe_tasks function.
- DESCRIBE_TASKS_BATCH_SIZE = 99
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.active_workers: EcsTaskCollection = EcsTaskCollection()
- self.pending_tasks: deque = deque()
-
- self.cluster = conf.get(CONFIG_GROUP_NAME, AllEcsConfigKeys.CLUSTER)
- self.container_name = conf.get(CONFIG_GROUP_NAME,
AllEcsConfigKeys.CONTAINER_NAME)
- aws_conn_id = conf.get(
- CONFIG_GROUP_NAME,
- AllEcsConfigKeys.AWS_CONN_ID,
- fallback=CONFIG_DEFAULTS[AllEcsConfigKeys.AWS_CONN_ID],
- )
- region_name = conf.get(CONFIG_GROUP_NAME, AllEcsConfigKeys.REGION_NAME)
- from airflow.providers.amazon.aws.hooks.ecs import EcsHook
-
- self.ecs = EcsHook(aws_conn_id=aws_conn_id,
region_name=region_name).conn
- self.run_task_kwargs = self._load_run_kwargs()
-
- def sync(self):
- try:
- self.sync_running_tasks()
- self.attempt_task_runs()
- except Exception:
- # We catch any and all exceptions because otherwise they would
bubble
- # up and kill the scheduler process
- self.log.exception("Failed to sync %s", self.__class__.__name__)
-
- def sync_running_tasks(self):
- """Checks and update state on all running tasks."""
- all_task_arns = self.active_workers.get_all_arns()
- if not all_task_arns:
- self.log.debug("No active Airflow tasks, skipping sync.")
- return
-
- describe_tasks_response = self.__describe_tasks(all_task_arns)
- self.log.debug("Active Workers: %s", describe_tasks_response)
-
- if describe_tasks_response["failures"]:
- for failure in describe_tasks_response["failures"]:
- self.__handle_failed_task(failure["arn"], failure["reason"])
-
- updated_tasks = describe_tasks_response["tasks"]
- for task in updated_tasks:
- self.__update_running_task(task)
-
- def __update_running_task(self, task):
- self.active_workers.update_task(task)
- # Get state of current task.
- task_state = task.get_task_state()
- task_key = self.active_workers.arn_to_key[task.task_arn]
- # Mark finished tasks as either a success/failure.
- if task_state == State.FAILED:
- self.fail(task_key)
- elif task_state == State.SUCCESS:
- self.success(task_key)
- elif task_state == State.REMOVED:
- self.__handle_failed_task(task.task_arn, task.stopped_reason)
- if task_state in (State.FAILED, State.SUCCESS):
- self.log.debug(
- "Airflow task %s marked as %s after running on %s",
- task_key,
- task_state,
- task.task_arn,
- )
- self.active_workers.pop_by_key(task_key)
-
- def __describe_tasks(self, task_arns):
- all_task_descriptions = {"tasks": [], "failures": []}
- for i in range(0, len(task_arns), self.DESCRIBE_TASKS_BATCH_SIZE):
- batched_task_arns = task_arns[i : i +
self.DESCRIBE_TASKS_BATCH_SIZE]
- if not batched_task_arns:
- continue
- boto_describe_tasks =
self.ecs.describe_tasks(tasks=batched_task_arns, cluster=self.cluster)
- describe_tasks_response =
BotoDescribeTasksSchema().load(boto_describe_tasks)
-
-
all_task_descriptions["tasks"].extend(describe_tasks_response["tasks"])
-
all_task_descriptions["failures"].extend(describe_tasks_response["failures"])
- return all_task_descriptions
-
- def __handle_failed_task(self, task_arn: str, reason: str):
- """If an API failure occurs, the task is rescheduled."""
- task_key = self.active_workers.arn_to_key[task_arn]
- task_info = self.active_workers.info_by_key(task_key)
- task_cmd = task_info.cmd
- queue = task_info.queue
- exec_info = task_info.config
- failure_count = self.active_workers.failure_count_by_key(task_key)
- if int(failure_count) < int(self.__class__.MAX_RUN_TASK_ATTEMPTS):
- self.log.warning(
- "Airflow task %s failed due to %s. Failure %s out of %s
occurred on %s. Rescheduling.",
- task_key,
- reason,
- failure_count,
- self.__class__.MAX_RUN_TASK_ATTEMPTS,
- task_arn,
- )
- self.active_workers.increment_failure_count(task_key)
- self.pending_tasks.appendleft(
- EcsQueuedTask(task_key, task_cmd, queue, exec_info,
failure_count + 1)
- )
- else:
- self.log.error(
- "Airflow task %s has failed a maximum of %s times. Marking as
failed",
- task_key,
- failure_count,
- )
- self.active_workers.pop_by_key(task_key)
- self.fail(task_key)
-
- def attempt_task_runs(self):
- """
- Takes tasks from the pending_tasks queue, and attempts to find an
instance to run it on.
-
- If the launch type is EC2, this will attempt to place tasks on empty
EC2 instances. If
- there are no EC2 instances available, no task is placed and this
function will be
- called again in the next heart-beat.
-
- If the launch type is FARGATE, this will run the tasks on new AWS
Fargate instances.
- """
- queue_len = len(self.pending_tasks)
- failure_reasons = defaultdict(int)
- for _ in range(queue_len):
- ecs_task = self.pending_tasks.popleft()
- task_key = ecs_task.key
- cmd = ecs_task.command
- queue = ecs_task.queue
- exec_config = ecs_task.executor_config
- attempt_number = ecs_task.attempt_number
- _failure_reasons = []
- try:
- run_task_response = self._run_task(task_key, cmd, queue,
exec_config)
- except Exception as e:
- # Failed to even get a response back from the Boto3 API or
something else went
- # wrong. For any possible failure we want to add the
exception reasons to the
- # failure list so that it is logged to the user and most
importantly the task is
- # added back to the pending list to be retried later.
- _failure_reasons.append(str(e))
- else:
- # We got a response back, check if there were failures. If so,
add them to the
- # failures list so that it is logged to the user and most
importantly the task
- # is added back to the pending list to be retried later.
- if run_task_response["failures"]:
- _failure_reasons.extend([f["reason"] for f in
run_task_response["failures"]])
-
- if _failure_reasons:
- for reason in _failure_reasons:
- failure_reasons[reason] += 1
- # Make sure the number of attempts does not exceed
MAX_RUN_TASK_ATTEMPTS
- if int(attempt_number) <=
int(self.__class__.MAX_RUN_TASK_ATTEMPTS):
- ecs_task.attempt_number += 1
- self.pending_tasks.appendleft(ecs_task)
- else:
- self.log.error(
- "ECS task %s has failed a maximum of %s times. Marking
as failed",
- task_key,
- attempt_number,
- )
- self.fail(task_key)
- elif not run_task_response["tasks"]:
- self.log.error("ECS RunTask Response: %s", run_task_response)
- raise EcsExecutorException(
- "No failures and no ECS tasks provided in response. This
should never happen."
- )
- else:
- task = run_task_response["tasks"][0]
- self.active_workers.add_task(task, task_key, queue, cmd,
exec_config, attempt_number)
- if failure_reasons:
- self.log.error(
- "Pending ECS tasks failed to launch for the following reasons:
%s. Retrying later.",
- dict(failure_reasons),
- )
-
- def _run_task(
- self, task_id: TaskInstanceKey, cmd: CommandType, queue: str,
exec_config: ExecutorConfigType
- ):
- """
- Run a queued-up Airflow task.
-
- Not to be confused with execute_async() which inserts tasks into the
queue.
- The command and executor config will be placed in the
container-override
- section of the JSON request before calling Boto3's "run_task" function.
- """
- run_task_api = self._run_task_kwargs(task_id, cmd, queue, exec_config)
- boto_run_task = self.ecs.run_task(**run_task_api)
- run_task_response = BotoRunTaskSchema().load(boto_run_task)
- return run_task_response
-
- def _run_task_kwargs(
- self, task_id: TaskInstanceKey, cmd: CommandType, queue: str,
exec_config: ExecutorConfigType
- ) -> dict:
- """
- Overrides the Airflow command to update the container overrides so
kwargs are specific to this task.
-
- One last chance to modify Boto3's "run_task" kwarg params before it
gets passed into the Boto3 client.
- """
- run_task_api = deepcopy(self.run_task_kwargs)
- container_override =
self.get_container(run_task_api["overrides"]["containerOverrides"])
- container_override["command"] = cmd
- container_override.update(exec_config)
-
- # Inject the env variable to configure logging for containerized
execution environment
- if "environment" not in container_override:
- container_override["environment"] = []
- container_override["environment"].append({"name":
"AIRFLOW_IS_EXECUTOR_CONTAINER", "value": "true"})
-
- return run_task_api
-
- def execute_async(self, key: TaskInstanceKey, command: CommandType,
queue=None, executor_config=None):
- """Save the task to be executed in the next sync by inserting the
commands into a queue."""
- if executor_config and ("name" in executor_config or "command" in
executor_config):
- raise ValueError('Executor Config should never override "name" or
"command"')
- self.pending_tasks.append(EcsQueuedTask(key, command, queue,
executor_config or {}, 1))
-
- def end(self, heartbeat_interval=10):
- """Waits for all currently running tasks to end, and doesn't launch
any tasks."""
- try:
- while True:
- self.sync()
- if not self.active_workers:
- break
- time.sleep(heartbeat_interval)
- except Exception:
- # We catch any and all exceptions because otherwise they would
bubble
- # up and kill the scheduler process.
- self.log.exception("Failed to end %s", self.__class__.__name__)
-
- def terminate(self):
- """Kill all ECS processes by calling Boto3's StopTask API."""
- try:
- for arn in self.active_workers.get_all_arns():
- self.ecs.stop_task(
- cluster=self.cluster, task=arn, reason="Airflow Executor
received a SIGTERM"
- )
- self.end()
- except Exception:
- # We catch any and all exceptions because otherwise they would
bubble
- # up and kill the scheduler process.
- self.log.exception("Failed to terminate %s",
self.__class__.__name__)
-
- def _load_run_kwargs(self) -> dict:
- from airflow.providers.amazon.aws.executors.ecs.ecs_executor_config
import build_task_kwargs
-
- ecs_executor_run_task_kwargs = build_task_kwargs()
-
- try:
-
self.get_container(ecs_executor_run_task_kwargs["overrides"]["containerOverrides"])["command"]
- except KeyError:
- raise KeyError(
- "Rendered JSON template does not contain key "
- '"overrides[containerOverrides][containers][x][command]"'
- )
- return ecs_executor_run_task_kwargs
-
- def get_container(self, container_list):
- """Searches task list for core Airflow container."""
- for container in container_list:
- if container["name"] == self.container_name:
- return container
- raise KeyError(f"No such container found by container name:
{self.container_name}")
diff --git a/airflow/providers/amazon/aws/executors/ecs/__init__.py
b/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
similarity index 100%
copy from airflow/providers/amazon/aws/executors/ecs/__init__.py
copy to airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
diff --git a/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst
b/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst
index b09e6afff0..a50e30720c 100644
--- a/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst
+++ b/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst
@@ -557,7 +557,7 @@ To configure Airflow to utilize the ECS Executor and
leverage the resources we'v
.. code-block:: bash
- export
AIRFLOW**CORE**EXECUTOR='airflow.providers.amazon.aws.executors.ecs.AwsEcsExecutor'
+ export
AIRFLOW**CORE**EXECUTOR='airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor'
export AIRFLOW**DATABASE**SQL*ALCHEMY*CONN=<postgres-connection-string>
diff --git a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
index 8333de36a8..e749ed6bba 100644
--- a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
+++ b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
@@ -30,14 +30,14 @@ import yaml
from inflection import camelize
from airflow.executors.base_executor import BaseExecutor
-from airflow.providers.amazon.aws.executors.ecs import (
+from airflow.providers.amazon.aws.executors.ecs import ecs_executor_config
+from airflow.providers.amazon.aws.executors.ecs.boto_schema import
BotoTaskSchema
+from airflow.providers.amazon.aws.executors.ecs.ecs_executor import (
CONFIG_GROUP_NAME,
AllEcsConfigKeys,
AwsEcsExecutor,
EcsTaskCollection,
- ecs_executor_config,
)
-from airflow.providers.amazon.aws.executors.ecs.boto_schema import
BotoTaskSchema
from airflow.providers.amazon.aws.executors.ecs.utils import (
CONFIG_DEFAULTS,
EcsExecutorTask,
diff --git a/tests/providers/amazon/aws/hooks/test_base_aws.py
b/tests/providers/amazon/aws/hooks/test_base_aws.py
index a6395f4667..2ea511c45d 100644
--- a/tests/providers/amazon/aws/hooks/test_base_aws.py
+++ b/tests/providers/amazon/aws/hooks/test_base_aws.py
@@ -41,7 +41,7 @@ from moto.core import DEFAULT_ACCOUNT_ID
from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.models.connection import Connection
-from airflow.providers.amazon.aws.executors.ecs import AwsEcsExecutor
+from airflow.providers.amazon.aws.executors.ecs.ecs_executor import
AwsEcsExecutor
from airflow.providers.amazon.aws.hooks.base_aws import (
AwsBaseHook,
AwsGenericHook,