This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ae9a7b8188 ECS Executor Health Check (#35412)
ae9a7b8188 is described below
commit ae9a7b8188514987bc6ae2aaf1f0332b680f384a
Author: Niko Oliveira <[email protected]>
AuthorDate: Fri Nov 3 15:31:33 2023 -0700
ECS Executor Health Check (#35412)
During startup the Scheduler calls start() on the configured Executor.
Attempt an API call to ECS via the Boto client in this method to test the
health of the ECS Executor.
This will test most of the machinery of the executor (credentials,
permissions, configuration, etc).
If the check fails and the executor is unhealthy don't allow the scheduler
to continue to start up,
fail hard and message clearly to the user what is the issue.
---------
Co-authored-by: ferruzzi <[email protected]>
Co-authored-by: Vincent <[email protected]>
---
.../amazon/aws/executors/ecs/ecs_executor.py | 57 ++++++++++++++
.../providers/amazon/aws/executors/ecs/utils.py | 2 +
airflow/providers/amazon/provider.yaml | 7 ++
.../executors/ecs-executor.rst | 2 +
.../amazon/aws/executors/ecs/test_ecs_executor.py | 90 ++++++++++++++++++++--
5 files changed, 153 insertions(+), 5 deletions(-)
diff --git a/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
b/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
index 0a63aea963..5d5bef6b87 100644
--- a/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
+++ b/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
@@ -28,7 +28,10 @@ from collections import defaultdict, deque
from copy import deepcopy
from typing import TYPE_CHECKING
+from botocore.exceptions import ClientError
+
from airflow.configuration import conf
+from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor
from airflow.providers.amazon.aws.executors.ecs.boto_schema import
BotoDescribeTasksSchema, BotoRunTaskSchema
from airflow.providers.amazon.aws.executors.ecs.utils import (
@@ -99,6 +102,60 @@ class AwsEcsExecutor(BaseExecutor):
self.ecs = EcsHook(aws_conn_id=aws_conn_id,
region_name=region_name).conn
self.run_task_kwargs = self._load_run_kwargs()
+ def start(self):
+ """
+ Make a test API call to check the health of the ECS Executor.
+
+ Deliberately use an invalid task ID, some potential outcomes in order:
+ 1. "AccessDeniedException" is raised if there are insufficient
permissions.
+ 2. "ClusterNotFoundException" is raised if permissions exist but the
cluster does not.
+ 3. The API responds with a failure message if the cluster is found
and there
+ are permissions, but the cluster itself has issues.
+ 4. "InvalidParameterException" is raised if the permissions and
cluster exist but the task does not.
+
+ The last one is considered a success state for the purposes of this
check.
+ """
+ check_health = conf.getboolean(
+ CONFIG_GROUP_NAME, AllEcsConfigKeys.CHECK_HEALTH_ON_STARTUP,
fallback=False
+ )
+
+ if not check_health:
+ return
+
+ self.log.info("Starting ECS Executor and determining health...")
+
+ success_status = "succeeded."
+ status = success_status
+
+ try:
+ invalid_task_id = "a" * 32
+ self.ecs.stop_task(cluster=self.cluster, task=invalid_task_id)
+
+ # If it got this far, something is wrong. stop_task() called with
an
+ # invalid taskID should have thrown a ClientError. All known
reasons are
+ # covered in the ``except`` block below, and this should never be
reached.
+ status = "failed for an unknown reason. "
+ except ClientError as ex:
+ error_code = ex.response["Error"]["Code"]
+ error_message = ex.response["Error"]["Message"]
+
+ if ("InvalidParameterException" in error_code) and ("task was not
found" in error_message):
+ # This failure is expected, and means we're healthy
+ pass
+ else:
+ # Catch all for unexpected failures
+ status = f"failed because: {error_message}. "
+ finally:
+ msg_prefix = "ECS Executor health check has %s"
+ if status == success_status:
+ self.log.info(msg_prefix, status)
+ else:
+ msg_error_suffix = (
+ "The ECS executor will not be able to run Airflow tasks
until the issue is addressed. "
+ "Stopping the Airflow Scheduler from starting until the
issue is resolved."
+ )
+ raise AirflowException(msg_prefix % status + msg_error_suffix)
+
def sync(self):
try:
self.sync_running_tasks()
diff --git a/airflow/providers/amazon/aws/executors/ecs/utils.py
b/airflow/providers/amazon/aws/executors/ecs/utils.py
index 4b5c69cb68..4966fa3d2b 100644
--- a/airflow/providers/amazon/aws/executors/ecs/utils.py
+++ b/airflow/providers/amazon/aws/executors/ecs/utils.py
@@ -46,6 +46,7 @@ CONFIG_DEFAULTS = {
"assign_public_ip": "False",
"launch_type": "FARGATE",
"platform_version": "LATEST",
+ "check_health_on_startup": "True",
}
@@ -96,6 +97,7 @@ class AllEcsConfigKeys(RunTaskKwargsConfigKeys):
AWS_CONN_ID = "conn_id"
RUN_TASK_KWARGS = "run_task_kwargs"
REGION_NAME = "region_name"
+ CHECK_HEALTH_ON_STARTUP = "check_health_on_startup"
class EcsExecutorException(Exception):
diff --git a/airflow/providers/amazon/provider.yaml
b/airflow/providers/amazon/provider.yaml
index 567ca81c3f..dda844ae71 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -887,3 +887,10 @@ config:
type: string
example: '{"tags": {"key": "schema", "value": "1.0"}}'
default: ~
+ check_health_on_startup:
+ description: |
+ Whether or not to check the ECS Executor health on startup.
+ version_added: "8.11"
+ type: boolean
+ example: "True"
+ default: "True"
diff --git a/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst
b/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst
index a50e30720c..75c4b46b6c 100644
--- a/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst
+++ b/docs/apache-airflow-providers-amazon/executors/ecs-executor.rst
@@ -106,6 +106,8 @@ Optional config options:
- MAX_RUN_TASK_ATTEMPTS - The maximum number of times the Ecs Executor
should attempt to run a task. This refers to instances where the task
fails to start (i.e. ECS API failures, container failures etc.)
+- CHECK_HEALTH_ON_STARTUP - Whether or not to check the ECS Executor
+ health on startup
For a more detailed description of available options, including type
hints and examples, see the ``config_templates`` folder in the Amazon
diff --git a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
index e749ed6bba..ef4c012640 100644
--- a/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
+++ b/tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
@@ -18,6 +18,7 @@ from __future__ import annotations
import datetime as dt
import json
+import logging
import os
import re
import time
@@ -27,8 +28,10 @@ from unittest import mock
import pytest
import yaml
+from botocore.exceptions import ClientError
from inflection import camelize
+from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor
from airflow.providers.amazon.aws.executors.ecs import ecs_executor_config
from airflow.providers.amazon.aws.executors.ecs.boto_schema import
BotoTaskSchema
@@ -98,8 +101,7 @@ def mock_config():
@pytest.fixture
-def mock_executor() -> AwsEcsExecutor:
- """Mock ECS to a repeatable starting state.."""
+def set_env_vars():
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.REGION_NAME}".upper()]
= "us-west-1"
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.CLUSTER}".upper()]
= "some-cluster"
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.CONTAINER_NAME}".upper()]
= "container-name"
@@ -110,6 +112,11 @@ def mock_executor() -> AwsEcsExecutor:
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.SECURITY_GROUPS}".upper()]
= "sg1,sg2"
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.SUBNETS}".upper()]
= "sub1,sub2"
os.environ[f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.MAX_RUN_TASK_ATTEMPTS}".upper()]
= "3"
+
+
[email protected]
+def mock_executor(set_env_vars) -> AwsEcsExecutor:
+ """Mock ECS to a repeatable starting state.."""
executor = AwsEcsExecutor()
# Replace boto3 ECS client with mock.
@@ -788,9 +795,13 @@ class TestEcsExecutorConfig:
found_keys = {convert_camel_to_snake(key): key for key in
task_kwargs.keys()}
for expected_key, expected_value in CONFIG_DEFAULTS.items():
- # "conn_id" and max_run_task_attempts are used by the executor,
but are not expected to appear
- # in the task_kwargs.
- if expected_key in [AllEcsConfigKeys.AWS_CONN_ID,
AllEcsConfigKeys.MAX_RUN_TASK_ATTEMPTS]:
+ # conn_id, max_run_task_attempts, and check_health_on_startup are
used by the executor,
+ # but are not expected to appear in the task_kwargs.
+ if expected_key in [
+ AllEcsConfigKeys.AWS_CONN_ID,
+ AllEcsConfigKeys.MAX_RUN_TASK_ATTEMPTS,
+ AllEcsConfigKeys.CHECK_HEALTH_ON_STARTUP,
+ ]:
assert expected_key not in found_keys.keys()
else:
assert expected_key in found_keys.keys()
@@ -919,3 +930,72 @@ class TestEcsExecutorConfig:
assert run_task_kwargs_network_config[camelized_key] ==
"ENABLED"
else:
assert run_task_kwargs_network_config[camelized_key] ==
value.split(",")
+
+ def test_start_failure_with_invalid_permissions(self, set_env_vars):
+ executor = AwsEcsExecutor()
+
+ # Replace boto3 ECS client with mock.
+ ecs_mock = mock.Mock(spec=executor.ecs)
+ mock_resp = {
+ "Error": {
+ "Code": "AccessDeniedException",
+ "Message": "no identity-based policy allows the ecs:StopTask
action",
+ }
+ }
+ ecs_mock.stop_task.side_effect = ClientError(mock_resp, "StopTask")
+
+ executor.ecs = ecs_mock
+
+ with pytest.raises(AirflowException,
match=mock_resp["Error"]["Message"]):
+ executor.start()
+
+ def test_start_failure_with_invalid_cluster_name(self, set_env_vars):
+ executor = AwsEcsExecutor()
+
+ # Replace boto3 ECS client with mock.
+ ecs_mock = mock.Mock(spec=executor.ecs)
+ mock_resp = {"Error": {"Code": "ClusterNotFoundException", "Message":
"Cluster not found."}}
+ ecs_mock.stop_task.side_effect = ClientError(mock_resp, "StopTask")
+
+ executor.ecs = ecs_mock
+
+ with pytest.raises(AirflowException,
match=mock_resp["Error"]["Message"]):
+ executor.start()
+
+ def test_start_success(self, set_env_vars, caplog):
+ executor = AwsEcsExecutor()
+
+ # Replace boto3 ECS client with mock.
+ ecs_mock = mock.Mock(spec=executor.ecs)
+ mock_resp = {
+ "Error": {"Code": "InvalidParameterException", "Message": "The
referenced task was not found."}
+ }
+ ecs_mock.stop_task.side_effect = ClientError(mock_resp, "StopTask")
+
+ executor.ecs = ecs_mock
+
+ caplog.set_level(logging.DEBUG)
+
+ executor.start()
+
+ assert "succeeded" in caplog.text
+
+ def test_start_health_check_config(self, set_env_vars):
+ executor = AwsEcsExecutor()
+
+ # Replace boto3 ECS client with mock.
+ ecs_mock = mock.Mock(spec=executor.ecs)
+ mock_resp = {
+ "Error": {"Code": "InvalidParameterException", "Message": "The
referenced task was not found."}
+ }
+ ecs_mock.stop_task.side_effect = ClientError(mock_resp, "StopTask")
+
+ executor.ecs = ecs_mock
+
+ os.environ[
+
f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.CHECK_HEALTH_ON_STARTUP}".upper()
+ ] = "False"
+
+ executor.start()
+
+ ecs_mock.stop_task.assert_not_called()