Lee-W commented on code in PR #44557:
URL: https://github.com/apache/airflow/pull/44557#discussion_r1872889956
##########
providers/src/airflow/providers/http/sensors/http.py:
##########
@@ -177,5 +181,43 @@ def execute(self, context: Context) -> None:
method_name="execute_complete",
)
- def execute_complete(self, context: Context, event: dict[str, Any] | None
= None) -> None:
- self.log.info("%s completed successfully.", self.task_id)
+ @staticmethod
+ def _default_response_maker(response: Response | list[Response]) ->
Callable:
+ """
+ Create a default response maker function based on the type of response.
+
+ :param response: The response object or list of response objects.
+ :return: A function that returns response text(s).
+ """
+ if isinstance(response, Response):
+ response_object = response # Makes mypy happy
+ return lambda: response_object.text
+
+ response_list: list[Response] = response # Makes mypy happy
+ return lambda: [entry.text for entry in response_list]
+
+ def process_response(self, context: Context, response: Response |
list[Response]) -> Any:
+ """Process the response."""
+ from airflow.utils.operator_helpers import determine_kwargs
+
+ make_default_response: Callable =
self._default_response_maker(response=response)
+
+ if self.response_check:
+ kwargs = determine_kwargs(self.response_check, [response], context)
+ if not self.response_check(response, **kwargs):
+ raise AirflowException("Response check returned False.")
+ return make_default_response()
+
+ def execute_complete(self, context: Context, event: dict[str, Any]) ->
None:
+ if event["status"] == "success":
Review Comment:
we could dedent the whole block by using `if event["status"] != ["success"]`
##########
providers/src/airflow/providers/http/sensors/http.py:
##########
@@ -177,5 +181,43 @@ def execute(self, context: Context) -> None:
method_name="execute_complete",
)
- def execute_complete(self, context: Context, event: dict[str, Any] | None
= None) -> None:
- self.log.info("%s completed successfully.", self.task_id)
+ @staticmethod
+ def _default_response_maker(response: Response | list[Response]) ->
Callable:
+ """
+ Create a default response maker function based on the type of response.
+
+ :param response: The response object or list of response objects.
+ :return: A function that returns response text(s).
+ """
+ if isinstance(response, Response):
+ response_object = response # Makes mypy happy
+ return lambda: response_object.text
+
+ response_list: list[Response] = response # Makes mypy happy
+ return lambda: [entry.text for entry in response_list]
+
+ def process_response(self, context: Context, response: Response |
list[Response]) -> Any:
Review Comment:
We probably would like to add some test case to these methods
##########
providers/src/airflow/providers/http/sensors/http.py:
##########
@@ -177,5 +181,43 @@ def execute(self, context: Context) -> None:
method_name="execute_complete",
)
- def execute_complete(self, context: Context, event: dict[str, Any] | None
= None) -> None:
- self.log.info("%s completed successfully.", self.task_id)
+ @staticmethod
+ def _default_response_maker(response: Response | list[Response]) ->
Callable:
+ """
+ Create a default response maker function based on the type of response.
+
+ :param response: The response object or list of response objects.
+ :return: A function that returns response text(s).
+ """
+ if isinstance(response, Response):
+ response_object = response # Makes mypy happy
+ return lambda: response_object.text
+
+ response_list: list[Response] = response # Makes mypy happy
+ return lambda: [entry.text for entry in response_list]
+
+ def process_response(self, context: Context, response: Response |
list[Response]) -> Any:
+ """Process the response."""
+ from airflow.utils.operator_helpers import determine_kwargs
+
+ make_default_response: Callable =
self._default_response_maker(response=response)
+
+ if self.response_check:
+ kwargs = determine_kwargs(self.response_check, [response], context)
+ if not self.response_check(response, **kwargs):
+ raise AirflowException("Response check returned False.")
+ return make_default_response()
+
+ def execute_complete(self, context: Context, event: dict[str, Any]) ->
None:
+ if event["status"] == "success":
+ response = event["response"]
+ if self.response_check:
+ retrieved_data =
cloudpickle.loads(base64.standard_b64decode(response))
+ if self.process_response(context=context,
response=retrieved_data):
+ self.log.info("response_check condition is matched for
%s", self.task_id)
+ else:
+ raise AirflowException("response_check condition is not
matched for %s", self.task_id)
Review Comment:
Would be better if we could create a dedicate exception instead of using
AirflowException
##########
providers/src/airflow/providers/http/sensors/http.py:
##########
@@ -177,5 +181,43 @@ def execute(self, context: Context) -> None:
method_name="execute_complete",
)
- def execute_complete(self, context: Context, event: dict[str, Any] | None
= None) -> None:
- self.log.info("%s completed successfully.", self.task_id)
+ @staticmethod
+ def _default_response_maker(response: Response | list[Response]) ->
Callable:
+ """
+ Create a default response maker function based on the type of response.
+
+ :param response: The response object or list of response objects.
+ :return: A function that returns response text(s).
+ """
+ if isinstance(response, Response):
+ response_object = response # Makes mypy happy
+ return lambda: response_object.text
+
+ response_list: list[Response] = response # Makes mypy happy
+ return lambda: [entry.text for entry in response_list]
+
+ def process_response(self, context: Context, response: Response |
list[Response]) -> Any:
+ """Process the response."""
+ from airflow.utils.operator_helpers import determine_kwargs
+
+ make_default_response: Callable =
self._default_response_maker(response=response)
+
+ if self.response_check:
+ kwargs = determine_kwargs(self.response_check, [response], context)
+ if not self.response_check(response, **kwargs):
+ raise AirflowException("Response check returned False.")
+ return make_default_response()
+
+ def execute_complete(self, context: Context, event: dict[str, Any]) ->
None:
+ if event["status"] == "success":
+ response = event["response"]
+ if self.response_check:
+ retrieved_data =
cloudpickle.loads(base64.standard_b64decode(response))
+ if self.process_response(context=context,
response=retrieved_data):
+ self.log.info("response_check condition is matched for
%s", self.task_id)
+ else:
+ raise AirflowException("response_check condition is not
matched for %s", self.task_id)
Review Comment:
```suggestion
if not self.process_response(context=context,
response=retrieved_data):
raise AirflowException("response_check condition is not
matched for %s", self.task_id)
self.log.info("response_check condition is matched for %s",
self.task_id)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]