eladkal commented on code in PR #47719:
URL: https://github.com/apache/airflow/pull/47719#discussion_r1993324662
##########
providers/http/tests/unit/http/sensors/test_http.py:
##########
@@ -370,15 +377,83 @@ def test_execute_is_deferred(self, mock_poke):
assert isinstance(exc.value.trigger, HttpSensorTrigger), "Trigger is
not a HttpTrigger"
- @mock.patch("airflow.providers.http.sensors.http.HttpSensor.defer")
- @mock.patch("airflow.sensors.base.BaseSensorOperator.execute")
- def test_execute_not_defer_when_response_check_is_not_none(self,
mock_execute, mock_defer):
+ @mock.patch(
+ "airflow.providers.http.sensors.http.HttpSensor.poke",
+ return_value=False,
+ )
+ def test_execute_defer_when_response_check_is_not_none(self, mock_poke):
task = HttpSensor(
task_id="run_now",
endpoint="test-endpoint",
response_check=lambda response: "httpbin" in response.text,
deferrable=True,
)
- task.execute({})
- mock_execute.assert_called_once()
- mock_defer.assert_not_called()
+ with pytest.raises(TaskDeferred) as exc:
+ task.execute({})
+ assert isinstance(exc.value.trigger, HttpSensorTrigger), "Trigger is
not a HttpTrigger"
+
+ @mock.patch("airflow.providers.http.sensors.http.HttpSensor.log")
+ def test_execute_complete_success_without_response_check(self, mock_log):
+ """
+ Test execute_complete when event status is 'success' and no
response_check is provided.
+ """
+ task = HttpSensor(task_id="test_http_sensor", endpoint="/test")
+ context = mock.Mock()
+ success_event = {
+ "status": "success",
+ "response": base64.standard_b64encode(cloudpickle.dumps({"key":
"value"})).decode("utf-8"),
+ }
+
+ task.response_check = None
+ task.execute_complete(context=context, event=success_event)
+ mock_log.info.assert_any_call("%s completed successfully.",
task.task_id)
+
+ @mock.patch("airflow.providers.http.sensors.http.HttpSensor.log")
+ def test_execute_complete_success_with_response_check(self, mock_log):
+ """
+ Test execute_complete when event status is 'success' and
response_check passes.
+ """
+ task = HttpSensor(task_id="test_http_sensor", endpoint="/test")
+ context = mock.Mock()
+ success_event = {
+ "status": "success",
+ "response": base64.standard_b64encode(cloudpickle.dumps({"key":
"value"})).decode("utf-8"),
+ }
+
+ task.response_check = mock.Mock(return_value=True)
+ with mock.patch.object(task, "process_response", return_value=True) as
mock_process_response:
+ task.execute_complete(context=context, event=success_event)
+ mock_process_response.assert_called_once_with(context=context,
response={"key": "value"})
+ mock_log.info.assert_any_call("response_check condition is matched
for %s", task.task_id)
+ mock_log.info.assert_any_call("%s completed successfully.",
task.task_id)
+
+ def test_execute_complete_failure(self):
+ """
+ Test execute_complete when event status is 'failure'.
+ """
+ task = HttpSensor(task_id="test_http_sensor", endpoint="/test")
+ context = mock.Mock()
+ failure_event = {"status": "failure", "message": "An error occurred"}
+
+ with pytest.raises(AirflowException, match="Unexpected error in the
operation: An error occurred"):
+ task.execute_complete(context=context, event=failure_event)
+
+ @mock.patch("airflow.providers.http.sensors.http.HttpSensor.log")
+ def test_execute_complete_response_check_failure(self, mock_log):
+ """
+ Test execute_complete when response_check fails.
+ """
+ task = HttpSensor(task_id="test_http_sensor", endpoint="/test")
+ context = mock.Mock()
+ success_event = {
+ "status": "success",
+ "response": base64.standard_b64encode(cloudpickle.dumps({"key":
"value"})).decode("utf-8"),
+ }
+
+ task.response_check = mock.Mock(return_value=True)
+ with mock.patch.object(task, "process_response", return_value=False):
+ with pytest.raises(
+ ResponseCheckFailedException,
Review Comment:
```suggestion
AirflowException,
```
##########
providers/http/tests/unit/http/sensors/test_http.py:
##########
@@ -17,13 +17,20 @@
# under the License.
from __future__ import annotations
+import base64
from unittest import mock
from unittest.mock import patch
+import cloudpickle
import pytest
import requests
-from airflow.exceptions import AirflowException, AirflowSensorTimeout,
TaskDeferred
+from airflow.exceptions import (
+ AirflowException,
+ AirflowSensorTimeout,
+ ResponseCheckFailedException,
Review Comment:
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]