This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 29358761fe6 AwsLambdaExecutor: Support multi-team configuration 
(#61321)
29358761fe6 is described below

commit 29358761fe6c76a0dcbc921beff4404bc970a274
Author: Henry Chen <[email protected]>
AuthorDate: Fri Feb 20 04:25:27 2026 +0800

    AwsLambdaExecutor: Support multi-team configuration (#61321)
    
    Enable AwsLambdaExecutor to read team-based config correctly
---
 .../aws/executors/aws_lambda/lambda_executor.py    | 35 ++++++---
 .../executors/aws_lambda/test_lambda_executor.py   | 87 +++++++++++++++++++++-
 2 files changed, 111 insertions(+), 11 deletions(-)

diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py
index 67b23e0f0aa..e1cf8a81244 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py
@@ -41,7 +41,7 @@ from 
airflow.providers.amazon.aws.executors.utils.exponential_backoff_retry impo
 from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook
 from airflow.providers.amazon.aws.hooks.sqs import SqsHook
 from airflow.providers.amazon.version_compat import AIRFLOW_V_3_0_PLUS
-from airflow.providers.common.compat.sdk import AirflowException, Stats, conf, 
timezone
+from airflow.providers.common.compat.sdk import AirflowException, Stats, 
timezone
 
 if TYPE_CHECKING:
     from sqlalchemy.orm import Session
@@ -61,6 +61,8 @@ class AwsLambdaExecutor(BaseExecutor):
     to update task state in Airflow.
     """
 
+    supports_multi_team: bool = True
+
     if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS:
         # In the v3 path, we store workloads, not commands as strings.
         # TODO: TaskSDK: move this type change into BaseExecutor
@@ -70,12 +72,23 @@ class AwsLambdaExecutor(BaseExecutor):
         super().__init__(*args, **kwargs)
         self.pending_tasks: deque = deque()
         self.running_tasks: dict[str, TaskInstanceKey] = {}
-        self.lambda_function_name = conf.get(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.FUNCTION_NAME)
-        self.sqs_queue_url = conf.get(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.QUEUE_URL)
-        self.dlq_url = conf.get(CONFIG_GROUP_NAME, AllLambdaConfigKeys.DLQ_URL)
-        self.qualifier = conf.get(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.QUALIFIER, fallback=None)
+
+        # Check if self has the ExecutorConf set on the self.conf attribute, 
and if not, set it to the global
+        # configuration object. This allows the changes to be backwards 
compatible with older versions of
+        # Airflow.
+        # Can be removed when minimum supported provider version is equal to 
the version of core airflow
+        # which introduces multi-team configuration.
+        if not hasattr(self, "conf"):
+            from airflow.providers.common.compat.sdk import conf
+
+            self.conf = conf
+
+        self.lambda_function_name = self.conf.get(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.FUNCTION_NAME)
+        self.sqs_queue_url = self.conf.get(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.QUEUE_URL)
+        self.dlq_url = self.conf.get(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.DLQ_URL)
+        self.qualifier = self.conf.get(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.QUALIFIER, fallback=None)
         # Maximum number of retries to invoke Lambda.
-        self.max_invoke_attempts = conf.get(
+        self.max_invoke_attempts = self.conf.get(
             CONFIG_GROUP_NAME,
             AllLambdaConfigKeys.MAX_INVOKE_ATTEMPTS,
         )
@@ -86,7 +99,7 @@ class AwsLambdaExecutor(BaseExecutor):
 
     def start(self):
         """Call this when the Executor is run for the first time by the 
scheduler."""
-        check_health = conf.getboolean(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.CHECK_HEALTH_ON_STARTUP)
+        check_health = self.conf.getboolean(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.CHECK_HEALTH_ON_STARTUP)
 
         if not check_health:
             return
@@ -152,8 +165,8 @@ class AwsLambdaExecutor(BaseExecutor):
         :param check_connection: If True, check the health of the connection 
after loading it.
         """
         self.log.info("Loading Connections")
-        aws_conn_id = conf.get(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.AWS_CONN_ID)
-        region_name = conf.get(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.REGION_NAME, fallback=None)
+        aws_conn_id = self.conf.get(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.AWS_CONN_ID)
+        region_name = self.conf.get(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.REGION_NAME, fallback=None)
         self.sqs_client = SqsHook(aws_conn_id=aws_conn_id, 
region_name=region_name).conn
         self.lambda_client = LambdaHook(aws_conn_id=aws_conn_id, 
region_name=region_name).conn
 
@@ -492,7 +505,9 @@ class AwsLambdaExecutor(BaseExecutor):
         :param heartbeat_interval: The interval in seconds to wait between 
checks for task completion.
         """
         self.log.info("Received signal to end, waiting for outstanding tasks 
to finish.")
-        time_to_wait = int(conf.get(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.END_WAIT_TIMEOUT, fallback="0"))
+        time_to_wait = int(
+            self.conf.get(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.END_WAIT_TIMEOUT, fallback="0")
+        )
         start_time = timezone.utcnow()
         while True:
             if time_to_wait:
diff --git 
a/providers/amazon/tests/unit/amazon/aws/executors/aws_lambda/test_lambda_executor.py
 
b/providers/amazon/tests/unit/amazon/aws/executors/aws_lambda/test_lambda_executor.py
index d1265b33a0b..8f0c24c259b 100644
--- 
a/providers/amazon/tests/unit/amazon/aws/executors/aws_lambda/test_lambda_executor.py
+++ 
b/providers/amazon/tests/unit/amazon/aws/executors/aws_lambda/test_lambda_executor.py
@@ -36,7 +36,7 @@ from airflow.version import version as airflow_version_str
 
 from tests_common.test_utils.compat import timezone
 from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, 
AIRFLOW_V_3_1_PLUS
 
 airflow_version = VersionInfo(*map(int, airflow_version_str.split(".")[:3]))
 
@@ -1028,3 +1028,88 @@ class TestAwsLambdaExecutor:
             "Terminating Lambda executor. In-flight tasks cannot be stopped."
         )
         assert len(mock_executor.running_tasks) == 1
+
+    @pytest.mark.skipif(not AIRFLOW_V_3_1_PLUS, reason="Multi-team support 
requires Airflow 3.1+")
+    def test_team_config(self):
+        """Test that the executor uses team-specific configuration when 
provided via self.conf."""
+        from unittest.mock import patch
+
+        # Team name to be used throughout
+        team_name = "team_a"
+        # Patch environment to include two sets of configs for the Lambda 
executor. One that is related to a
+        # team and one that is not. Then we will create two executors (one 
with a team and one without) and
+        # ensure the correct configs are used.
+        config_overrides = [
+            
(f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.FUNCTION_NAME}", 
"global-function"),
+            (f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.QUEUE_URL}", 
"global-queue-url"),
+            (f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.DLQ_URL}", 
"global-dlq-url"),
+            (f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.QUALIFIER}", 
"global-qualifier"),
+            
(f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.REGION_NAME}", 
"us-west-1"),
+            
(f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.AWS_CONN_ID}", 
"aws_default"),
+            
(f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.MAX_INVOKE_ATTEMPTS}", 
"3"),
+            (
+                
f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.CHECK_HEALTH_ON_STARTUP}",
+                "False",
+            ),
+            # Team Config
+            (
+                
f"AIRFLOW__{team_name}___{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.FUNCTION_NAME}",
+                "team_a_function",
+            ),
+            (
+                
f"AIRFLOW__{team_name}___{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.QUEUE_URL}",
+                "team_a_queue_url",
+            ),
+            (
+                
f"AIRFLOW__{team_name}___{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.DLQ_URL}",
+                "team_a_dlq_url",
+            ),
+            (
+                
f"AIRFLOW__{team_name}___{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.QUALIFIER}",
+                "team_a_qualifier",
+            ),
+            (
+                
f"AIRFLOW__{team_name}___{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.REGION_NAME}",
+                "us-west-2",
+            ),
+            (
+                
f"AIRFLOW__{team_name}___{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.MAX_INVOKE_ATTEMPTS}",
+                "5",
+            ),
+            (
+                
f"AIRFLOW__{team_name}___{CONFIG_GROUP_NAME}__{AllLambdaConfigKeys.CHECK_HEALTH_ON_STARTUP}",
+                "True",
+            ),
+        ]
+        with patch("os.environ", {key.upper(): value for key, value in 
config_overrides}):
+            # Create a team-specific executor
+            team_executor = AwsLambdaExecutor(team_name=team_name)
+
+            assert team_executor.lambda_function_name == "team_a_function"
+            assert team_executor.sqs_queue_url == "team_a_queue_url"
+            assert team_executor.dlq_url == "team_a_dlq_url"
+            assert team_executor.qualifier == "team_a_qualifier"
+            assert team_executor.conf.get(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.MAX_INVOKE_ATTEMPTS) == "5"
+            assert team_executor.conf.get(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.REGION_NAME) == "us-west-2"
+            assert team_executor.conf.get(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.AWS_CONN_ID) == "aws_default"
+            assert (
+                team_executor.conf.get(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.CHECK_HEALTH_ON_STARTUP)
+                == "True"
+            )
+
+            # Now create an executor without a team and ensure the global 
configs are used
+            global_executor = AwsLambdaExecutor()
+
+            assert global_executor.lambda_function_name == "global-function"
+            assert global_executor.sqs_queue_url == "global-queue-url"
+            assert global_executor.dlq_url == "global-dlq-url"
+            assert global_executor.qualifier == "global-qualifier"
+            assert global_executor.conf.get(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.MAX_INVOKE_ATTEMPTS) == "3"
+            assert global_executor.conf.get(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.REGION_NAME) == "us-west-1"
+            assert (
+                global_executor.conf.get(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.AWS_CONN_ID) == "aws_default"
+            )
+            assert (
+                global_executor.conf.get(CONFIG_GROUP_NAME, 
AllLambdaConfigKeys.CHECK_HEALTH_ON_STARTUP)
+                == "False"
+            )

Reply via email to