This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fba17703b7 Fix(http) bug, pass request_kwargs to HttpHooK.run (#43459)
fba17703b7 is described below

commit fba17703b7e745a7a8574693f75f7b46dec13222
Author: childe <[email protected]>
AuthorDate: Tue Oct 29 18:48:45 2024 +0800

    Fix(http) bug, pass request_kwargs to HttpHooK.run (#43459)
    
    * Fix(http) bug, pass request_kwargs to HttpHooK.run
    
    HttpHooK.run() accepts request_kwargs, however HttpOperator and HttpSensor
    do not pass request_kwargs to HttpHooK.run()
    
    * Format code for Ruff.format static check
---
 .../src/airflow/providers/http/operators/http.py    | 21 ++++++++++++++++++---
 .../src/airflow/providers/http/sensors/http.py      |  3 +++
 2 files changed, 21 insertions(+), 3 deletions(-)

diff --git a/providers/src/airflow/providers/http/operators/http.py 
b/providers/src/airflow/providers/http/operators/http.py
index 0ec9fd136c..358294af98 100644
--- a/providers/src/airflow/providers/http/operators/http.py
+++ b/providers/src/airflow/providers/http/operators/http.py
@@ -114,6 +114,7 @@ class HttpOperator(BaseOperator):
         response_check: Callable[..., bool] | None = None,
         response_filter: Callable[..., Any] | None = None,
         extra_options: dict[str, Any] | None = None,
+        request_kwargs: dict[str, Any] | None = None,
         http_conn_id: str = "http_default",
         log_response: bool = False,
         auth_type: type[AuthBase] | None = None,
@@ -143,6 +144,7 @@ class HttpOperator(BaseOperator):
         self.tcp_keep_alive_interval = tcp_keep_alive_interval
         self.deferrable = deferrable
         self.retry_args = retry_args
+        self.request_kwargs = request_kwargs or {}
 
     @property
     def hook(self) -> HttpHook:
@@ -173,10 +175,21 @@ class HttpOperator(BaseOperator):
         self.log.info("Calling HTTP method")
         if self.retry_args:
             response = self.hook.run_with_advanced_retry(
-                self.retry_args, self.endpoint, self.data, self.headers, 
self.extra_options
+                self.retry_args,
+                self.endpoint,
+                self.data,
+                self.headers,
+                self.extra_options,
+                **self.request_kwargs,
             )
         else:
-            response = self.hook.run(self.endpoint, self.data, self.headers, 
self.extra_options)
+            response = self.hook.run(
+                self.endpoint,
+                self.data,
+                self.headers,
+                self.extra_options,
+                **self.request_kwargs,
+            )
         response = self.paginate_sync(response=response)
         return self.process_response(context=context, response=response)
 
@@ -191,7 +204,8 @@ class HttpOperator(BaseOperator):
                 break
             if self.retry_args:
                 response = self.hook.run_with_advanced_retry(
-                    self.retry_args, 
**self._merge_next_page_parameters(next_page_params)
+                    self.retry_args,
+                    **self._merge_next_page_parameters(next_page_params),
                 )
             else:
                 response = 
self.hook.run(**self._merge_next_page_parameters(next_page_params))
@@ -304,6 +318,7 @@ class HttpOperator(BaseOperator):
             data=data,
             headers=merge_dicts(self.headers, next_page_params.get("headers", 
{})),
             extra_options=merge_dicts(self.extra_options, 
next_page_params.get("extra_options", {})),
+            **self.request_kwargs,
         )
 
 
diff --git a/providers/src/airflow/providers/http/sensors/http.py 
b/providers/src/airflow/providers/http/sensors/http.py
index 33b5e1d4de..ef0d9fd380 100644
--- a/providers/src/airflow/providers/http/sensors/http.py
+++ b/providers/src/airflow/providers/http/sensors/http.py
@@ -94,6 +94,7 @@ class HttpSensor(BaseSensorOperator):
         http_conn_id: str = "http_default",
         method: str = "GET",
         request_params: dict[str, Any] | None = None,
+        request_kwargs: dict[str, Any] | None = None,
         headers: dict[str, Any] | None = None,
         response_error_codes_allowlist: list[str] | None = None,
         response_check: Callable[..., bool] | None = None,
@@ -121,6 +122,7 @@ class HttpSensor(BaseSensorOperator):
         self.tcp_keep_alive_count = tcp_keep_alive_count
         self.tcp_keep_alive_interval = tcp_keep_alive_interval
         self.deferrable = deferrable
+        self.request_kwargs = request_kwargs or {}
 
     def poke(self, context: Context) -> bool:
         from airflow.utils.operator_helpers import determine_kwargs
@@ -141,6 +143,7 @@ class HttpSensor(BaseSensorOperator):
                 data=self.request_params,
                 headers=self.headers,
                 extra_options=self.extra_options,
+                **self.request_kwargs,
             )
 
             if self.response_check:

Reply via email to