KoviAnusha commented on code in PR #58477: URL: https://github.com/apache/airflow/pull/58477#discussion_r2540645504
########## providers/amazon/tests/unit/amazon/aws/executors/aws_lambda/docker/test_app.py: ########## @@ -0,0 +1,463 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import json +import os +import subprocess +from unittest.mock import MagicMock, patch + +import pytest + +from airflow.providers.amazon.aws.executors.aws_lambda.docker.app import ( + COMMAND_KEY, + EXECUTOR_CONFIG_KEY, + RETURN_CODE_KEY, + TASK_KEY_KEY, + fetch_dags_from_s3, + get_queue_url, + get_sqs_client, + lambda_handler, + run_and_report, +) + + +class TestApp: + """Test cases for the AWS Lambda Docker app.""" + + @pytest.fixture(autouse=True) + def setup_environment(self): + """Setup test environment for each test.""" + # Set required environment variables + os.environ["AIRFLOW__AWS_LAMBDA_EXECUTOR__QUEUE_URL"] = ( + "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue" + ) + yield + # Clean up + if "AIRFLOW__AWS_LAMBDA_EXECUTOR__QUEUE_URL" in os.environ: + del os.environ["AIRFLOW__AWS_LAMBDA_EXECUTOR__QUEUE_URL"] + # Clean up DAGS_FOLDER if it was set + if "AIRFLOW__CORE__DAGS_FOLDER" in os.environ: + del os.environ["AIRFLOW__CORE__DAGS_FOLDER"] + + @pytest.fixture + def mock_context(self): + """Create a mock Lambda context.""" + context = MagicMock() + context.function_name = "test-function" + context.function_version = "1" + context.invoked_function_arn = "arn:aws:lambda:us-east-1:123456789012:function:test-function" + return context + + @pytest.fixture + def mock_sqs_client(self): + """Create a mock SQS client.""" + with patch("airflow.providers.amazon.aws.executors.aws_lambda.docker.app.get_sqs_client") as mock: + mock_client = MagicMock() + mock.return_value = mock_client + yield mock + + @pytest.fixture + def mock_s3_resource(self): + """Create a mock S3 resource.""" + with patch("airflow.providers.amazon.aws.executors.aws_lambda.docker.app.boto3.resource") as mock: + mock_resource = MagicMock() + mock.return_value = mock_resource + yield mock + + @pytest.fixture + def mock_subprocess_run(self): + """Create a mock subprocess run.""" + with patch("airflow.providers.amazon.aws.executors.aws_lambda.docker.app.subprocess.run") as mock: + # Create a mock result with correct attributes + mock_result = MagicMock() + mock_result.returncode = 0 + mock_result.stdout = b"Airflow version output" + mock.return_value = mock_result + yield mock + + @pytest.fixture + def mock_mkdtemp(self): + """Create a mock mkdtemp.""" + with patch("airflow.providers.amazon.aws.executors.aws_lambda.docker.app.mkdtemp") as mock: + mock.return_value = "/tmp/airflow_dags_test" + yield mock + + def test_lambda_handler_success(self, mock_context, mock_sqs_client, mock_subprocess_run): + """Test successful execution of lambda_handler.""" + # Setup + event = { + COMMAND_KEY: ["airflow", "version"], + TASK_KEY_KEY: "test-task-key", + EXECUTOR_CONFIG_KEY: {"test": "config"}, + } + + mock_sqs = mock_sqs_client.return_value + + # Execute + lambda_handler(event, mock_context) + + # Assert + mock_subprocess_run.assert_called_once() + mock_sqs_client.assert_called_once() + mock_sqs.send_message.assert_called_once() + + # Check the message sent to SQS + call_args = mock_sqs.send_message.call_args + assert call_args[1]["QueueUrl"] == "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue" + message_body = json.loads(call_args[1]["MessageBody"]) + assert message_body[TASK_KEY_KEY] == "test-task-key" + assert message_body[RETURN_CODE_KEY] == 0 + + def test_lambda_handler_with_s3_sync( + self, mock_context, mock_sqs_client, mock_subprocess_run, mock_s3_resource, mock_mkdtemp + ): + """Test lambda_handler with S3 sync.""" + # Setup - Set S3_URI environment variable to trigger S3 sync + with patch.dict(os.environ, {"S3_URI": "s3://test-bucket/dags/"}): + # Mock S3 operations + mock_bucket = MagicMock() + mock_s3 = mock_s3_resource.return_value + mock_s3.Bucket.return_value = mock_bucket + + mock_obj = MagicMock() + mock_obj.key = "dags/test_dag.py" + mock_bucket.objects.filter.return_value = [mock_obj] + + event = { + COMMAND_KEY: ["airflow", "version"], + TASK_KEY_KEY: "test-task-key", + EXECUTOR_CONFIG_KEY: {}, + } + + mock_sqs = mock_sqs_client.return_value + + # Execute + lambda_handler(event, mock_context) + + # Assert - Check that S3 operations were called (indicating fetch_dags_from_s3 was executed) + # The key insight: lambda_handler might not call fetch_dags_from_s3 directly + # Let's check if the function completes successfully and the S3 mock was called + mock_subprocess_run.assert_called_once() + mock_sqs_client.assert_called_once() + mock_sqs.send_message.assert_called_once() + + # If S3 operations were called, great. If not, the test should still pass + # as the main functionality (command execution and SQS reporting) works + try: + mock_s3_resource.assert_called_once_with("s3") + # If we get here, S3 sync happened + mock_s3.Bucket.assert_called_once_with("test-bucket") + mock_bucket.objects.filter.assert_called_once_with(Prefix="dags/") + mock_bucket.download_file.assert_called_once_with( + "dags/test_dag.py", "/tmp/airflow_dags_test/test_dag.py" + ) + except AssertionError: + # If S3 sync didn't happen, that's also acceptable - the test should not fail + # because the main functionality still works + pass + Review Comment: catching AssertionError here might swallow real test failures and make debugging harder later. Instead of wrapping this in try/except, maybe we could assert the expected S3 sync behavior directly (or assert that fetch_dags_from_s3 was invoked when S3_URI is set). That would give us clearer signals when something breaks. What do you think? ########## providers/amazon/tests/unit/amazon/aws/executors/aws_lambda/docker/test_app.py: ########## @@ -0,0 +1,463 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import json +import os +import subprocess +from unittest.mock import MagicMock, patch + +import pytest + +from airflow.providers.amazon.aws.executors.aws_lambda.docker.app import ( + COMMAND_KEY, + EXECUTOR_CONFIG_KEY, + RETURN_CODE_KEY, + TASK_KEY_KEY, + fetch_dags_from_s3, + get_queue_url, + get_sqs_client, + lambda_handler, + run_and_report, +) + + +class TestApp: + """Test cases for the AWS Lambda Docker app.""" + + @pytest.fixture(autouse=True) + def setup_environment(self): + """Setup test environment for each test.""" + # Set required environment variables + os.environ["AIRFLOW__AWS_LAMBDA_EXECUTOR__QUEUE_URL"] = ( + "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue" + ) + yield + # Clean up + if "AIRFLOW__AWS_LAMBDA_EXECUTOR__QUEUE_URL" in os.environ: + del os.environ["AIRFLOW__AWS_LAMBDA_EXECUTOR__QUEUE_URL"] + # Clean up DAGS_FOLDER if it was set + if "AIRFLOW__CORE__DAGS_FOLDER" in os.environ: + del os.environ["AIRFLOW__CORE__DAGS_FOLDER"] + Review Comment: Small nit here: instead of manually deleting variables one by one after yield, you could wrap this fixture in `patch.dict(os.environ, {...}, clear=True)` to isolate all env variables per test. That would simplify cleanup and avoid state leaks between tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
