This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 29358761fe6 AwsLambdaExecutor: Support multi-team configuration
(#61321)
29358761fe6 is described below
commit 29358761fe6c76a0dcbc921beff4404bc970a274
Author: Henry Chen <[email protected]>
AuthorDate: Fri Feb 20 04:25:27 2026 +0800
AwsLambdaExecutor: Support multi-team configuration (#61321)
Enable AwsLambdaExecutor to read team-based config correctly
---
.../aws/executors/aws_lambda/lambda_executor.py | 35 ++++++---
.../executors/aws_lambda/test_lambda_executor.py | 87 +++++++++++++++++++++-
2 files changed, 111 insertions(+), 11 deletions(-)
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py
b/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py
index 67b23e0f0aa..e1cf8a81244 100644
---
a/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py
+++
b/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py
@@ -41,7 +41,7 @@ from
airflow.providers.amazon.aws.executors.utils.exponential_backoff_retry impo
from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook
from airflow.providers.amazon.aws.hooks.sqs import SqsHook
from airflow.providers.amazon.version_compat import AIRFLOW_V_3_0_PLUS
-from airflow.providers.common.compat.sdk import AirflowException, Stats, conf,
timezone
+from airflow.providers.common.compat.sdk import AirflowException, Stats,
timezone
if TYPE_CHECKING:
from sqlalchemy.orm import Session
@@ -61,6 +61,8 @@ class AwsLambdaExecutor(BaseExecutor):
to update task state in Airflow.
"""
+ supports_multi_team: bool = True
+
if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS:
# In the v3 path, we store workloads, not commands as strings.
# TODO: TaskSDK: move this type change into BaseExecutor
@@ -70,12 +72,23 @@ class AwsLambdaExecutor(BaseExecutor):
super().__init__(*args, **kwargs)
self.pending_tasks: deque = deque()
self.running_tasks: dict[str, TaskInstanceKey] = {}
- self.lambda_function_name = conf.get(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.FUNCTION_NAME)
- self.sqs_queue_url = conf.get(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.QUEUE_URL)
- self.dlq_url = conf.get(CONFIG_GROUP_NAME, AllLambdaConfigKeys.DLQ_URL)
- self.qualifier = conf.get(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.QUALIFIER, fallback=None)
+
+ # Check if self has the ExecutorConf set on the self.conf attribute,
and if not, set it to the global
+ # configuration object. This allows the changes to be backwards
compatible with older versions of
+ # Airflow.
+ # Can be removed when minimum supported provider version is equal to
the version of core airflow
+ # which introduces multi-team configuration.
+ if not hasattr(self, "conf"):
+ from airflow.providers.common.compat.sdk import conf
+
+ self.conf = conf
+
+ self.lambda_function_name = self.conf.get(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.FUNCTION_NAME)
+ self.sqs_queue_url = self.conf.get(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.QUEUE_URL)
+ self.dlq_url = self.conf.get(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.DLQ_URL)
+ self.qualifier = self.conf.get(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.QUALIFIER, fallback=None)
# Maximum number of retries to invoke Lambda.
- self.max_invoke_attempts = conf.get(
+ self.max_invoke_attempts = self.conf.get(
CONFIG_GROUP_NAME,
AllLambdaConfigKeys.MAX_INVOKE_ATTEMPTS,
)
@@ -86,7 +99,7 @@ class AwsLambdaExecutor(BaseExecutor):
def start(self):
"""Call this when the Executor is run for the first time by the
scheduler."""
- check_health = conf.getboolean(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.CHECK_HEALTH_ON_STARTUP)
+ check_health = self.conf.getboolean(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.CHECK_HEALTH_ON_STARTUP)
if not check_health:
return
@@ -152,8 +165,8 @@ class AwsLambdaExecutor(BaseExecutor):
:param check_connection: If True, check the health of the connection
after loading it.
"""
self.log.info("Loading Connections")
- aws_conn_id = conf.get(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.AWS_CONN_ID)
- region_name = conf.get(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.REGION_NAME, fallback=None)
+ aws_conn_id = self.conf.get(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.AWS_CONN_ID)
+ region_name = self.conf.get(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.REGION_NAME, fallback=None)
self.sqs_client = SqsHook(aws_conn_id=aws_conn_id,
region_name=region_name).conn
self.lambda_client = LambdaHook(aws_conn_id=aws_conn_id,
region_name=region_name).conn
@@ -492,7 +505,9 @@ class AwsLambdaExecutor(BaseExecutor):
:param heartbeat_interval: The interval in seconds to wait between
checks for task completion.
"""
self.log.info("Received signal to end, waiting for outstanding tasks
to finish.")
- time_to_wait = int(conf.get(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.END_WAIT_TIMEOUT, fallback="0"))
+ time_to_wait = int(
+ self.conf.get(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.END_WAIT_TIMEOUT, fallback="0")
+ )
start_time = timezone.utcnow()
while True:
if time_to_wait:
diff --git
a/providers/amazon/tests/unit/amazon/aws/executors/aws_lambda/test_lambda_executor.py
b/providers/amazon/tests/unit/amazon/aws/executors/aws_lambda/test_lambda_executor.py
index d1265b33a0b..8f0c24c259b 100644
---
a/providers/amazon/tests/unit/amazon/aws/executors/aws_lambda/test_lambda_executor.py
+++
b/providers/amazon/tests/unit/amazon/aws/executors/aws_lambda/test_lambda_executor.py
@@ -36,7 +36,7 @@ from airflow.version import version as airflow_version_str
from tests_common.test_utils.compat import timezone
from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_1_PLUS
airflow_version = VersionInfo(*map(int, airflow_version_str.split(".")[:3]))
@@ -1028,3 +1028,88 @@ class TestAwsLambdaExecutor:
"Terminating Lambda executor. In-flight tasks cannot be stopped."
)
assert len(mock_executor.running_tasks) == 1
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_1_PLUS, reason="Multi-team support
requires Airflow 3.1+")
+ def test_team_config(self):
+ """Test that the executor uses team-specific configuration when
provided via self.conf."""
+ from unittest.mock import patch
+
+ # Team name to be used throughout
+ team_name = "team_a"
+ # Patch environment to include two sets of configs for the Lambda
executor. One that is related to a
+ # team and one that is not. Then we will create two executors (one
with a team and one without) and
+ # ensure the correct configs are used.
+ config_overrides = [
+
(f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.FUNCTION_NAME}",
"global-function"),
+ (f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.QUEUE_URL}",
"global-queue-url"),
+ (f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.DLQ_URL}",
"global-dlq-url"),
+ (f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.QUALIFIER}",
"global-qualifier"),
+
(f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.REGION_NAME}",
"us-west-1"),
+
(f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.AWS_CONN_ID}",
"aws_default"),
+
(f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.MAX_INVOKE_ATTEMPTS}",
"3"),
+ (
+
f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.CHECK_HEALTH_ON_STARTUP}",
+ "False",
+ ),
+ # Team Config
+ (
+
f"AIRFLOW__{team_name}___{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.FUNCTION_NAME}",
+ "team_a_function",
+ ),
+ (
+
f"AIRFLOW__{team_name}___{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.QUEUE_URL}",
+ "team_a_queue_url",
+ ),
+ (
+
f"AIRFLOW__{team_name}___{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.DLQ_URL}",
+ "team_a_dlq_url",
+ ),
+ (
+
f"AIRFLOW__{team_name}___{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.QUALIFIER}",
+ "team_a_qualifier",
+ ),
+ (
+
f"AIRFLOW__{team_name}___{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.REGION_NAME}",
+ "us-west-2",
+ ),
+ (
+
f"AIRFLOW__{team_name}___{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.MAX_INVOKE_ATTEMPTS}",
+ "5",
+ ),
+ (
+
f"AIRFLOW__{team_name}___{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.CHECK_HEALTH_ON_STARTUP}",
+ "True",
+ ),
+ ]
+ with patch("os.environ", {key.upper(): value for key, value in
config_overrides}):
+ # Create a team-specific executor
+ team_executor = AwsLambdaExecutor(team_name=team_name)
+
+ assert team_executor.lambda_function_name == "team_a_function"
+ assert team_executor.sqs_queue_url == "team_a_queue_url"
+ assert team_executor.dlq_url == "team_a_dlq_url"
+ assert team_executor.qualifier == "team_a_qualifier"
+ assert team_executor.conf.get(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.MAX_INVOKE_ATTEMPTS) == "5"
+ assert team_executor.conf.get(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.REGION_NAME) == "us-west-2"
+ assert team_executor.conf.get(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.AWS_CONN_ID) == "aws_default"
+ assert (
+ team_executor.conf.get(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.CHECK_HEALTH_ON_STARTUP)
+ == "True"
+ )
+
+ # Now create an executor without a team and ensure the global
configs are used
+ global_executor = AwsLambdaExecutor()
+
+ assert global_executor.lambda_function_name == "global-function"
+ assert global_executor.sqs_queue_url == "global-queue-url"
+ assert global_executor.dlq_url == "global-dlq-url"
+ assert global_executor.qualifier == "global-qualifier"
+ assert global_executor.conf.get(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.MAX_INVOKE_ATTEMPTS) == "3"
+ assert global_executor.conf.get(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.REGION_NAME) == "us-west-1"
+ assert (
+ global_executor.conf.get(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.AWS_CONN_ID) == "aws_default"
+ )
+ assert (
+ global_executor.conf.get(CONFIG_GROUP_NAME,
AllLambdaConfigKeys.CHECK_HEALTH_ON_STARTUP)
+ == "False"
+ )