This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fba17703b7 Fix(http) bug, pass request_kwargs to HttpHooK.run (#43459)
fba17703b7 is described below
commit fba17703b7e745a7a8574693f75f7b46dec13222
Author: childe <[email protected]>
AuthorDate: Tue Oct 29 18:48:45 2024 +0800
Fix(http) bug, pass request_kwargs to HttpHooK.run (#43459)
* Fix(http) bug, pass request_kwargs to HttpHooK.run
HttpHooK.run() accepts request_kwargs, however HttpOperator and HttpSensor
do not pass request_kwargs to HttpHooK.run()
* Format code for Ruff.format static check
---
.../src/airflow/providers/http/operators/http.py | 21 ++++++++++++++++++---
.../src/airflow/providers/http/sensors/http.py | 3 +++
2 files changed, 21 insertions(+), 3 deletions(-)
diff --git a/providers/src/airflow/providers/http/operators/http.py
b/providers/src/airflow/providers/http/operators/http.py
index 0ec9fd136c..358294af98 100644
--- a/providers/src/airflow/providers/http/operators/http.py
+++ b/providers/src/airflow/providers/http/operators/http.py
@@ -114,6 +114,7 @@ class HttpOperator(BaseOperator):
response_check: Callable[..., bool] | None = None,
response_filter: Callable[..., Any] | None = None,
extra_options: dict[str, Any] | None = None,
+ request_kwargs: dict[str, Any] | None = None,
http_conn_id: str = "http_default",
log_response: bool = False,
auth_type: type[AuthBase] | None = None,
@@ -143,6 +144,7 @@ class HttpOperator(BaseOperator):
self.tcp_keep_alive_interval = tcp_keep_alive_interval
self.deferrable = deferrable
self.retry_args = retry_args
+ self.request_kwargs = request_kwargs or {}
@property
def hook(self) -> HttpHook:
@@ -173,10 +175,21 @@ class HttpOperator(BaseOperator):
self.log.info("Calling HTTP method")
if self.retry_args:
response = self.hook.run_with_advanced_retry(
- self.retry_args, self.endpoint, self.data, self.headers,
self.extra_options
+ self.retry_args,
+ self.endpoint,
+ self.data,
+ self.headers,
+ self.extra_options,
+ **self.request_kwargs,
)
else:
- response = self.hook.run(self.endpoint, self.data, self.headers,
self.extra_options)
+ response = self.hook.run(
+ self.endpoint,
+ self.data,
+ self.headers,
+ self.extra_options,
+ **self.request_kwargs,
+ )
response = self.paginate_sync(response=response)
return self.process_response(context=context, response=response)
@@ -191,7 +204,8 @@ class HttpOperator(BaseOperator):
break
if self.retry_args:
response = self.hook.run_with_advanced_retry(
- self.retry_args,
**self._merge_next_page_parameters(next_page_params)
+ self.retry_args,
+ **self._merge_next_page_parameters(next_page_params),
)
else:
response =
self.hook.run(**self._merge_next_page_parameters(next_page_params))
@@ -304,6 +318,7 @@ class HttpOperator(BaseOperator):
data=data,
headers=merge_dicts(self.headers, next_page_params.get("headers",
{})),
extra_options=merge_dicts(self.extra_options,
next_page_params.get("extra_options", {})),
+ **self.request_kwargs,
)
diff --git a/providers/src/airflow/providers/http/sensors/http.py
b/providers/src/airflow/providers/http/sensors/http.py
index 33b5e1d4de..ef0d9fd380 100644
--- a/providers/src/airflow/providers/http/sensors/http.py
+++ b/providers/src/airflow/providers/http/sensors/http.py
@@ -94,6 +94,7 @@ class HttpSensor(BaseSensorOperator):
http_conn_id: str = "http_default",
method: str = "GET",
request_params: dict[str, Any] | None = None,
+ request_kwargs: dict[str, Any] | None = None,
headers: dict[str, Any] | None = None,
response_error_codes_allowlist: list[str] | None = None,
response_check: Callable[..., bool] | None = None,
@@ -121,6 +122,7 @@ class HttpSensor(BaseSensorOperator):
self.tcp_keep_alive_count = tcp_keep_alive_count
self.tcp_keep_alive_interval = tcp_keep_alive_interval
self.deferrable = deferrable
+ self.request_kwargs = request_kwargs or {}
def poke(self, context: Context) -> bool:
from airflow.utils.operator_helpers import determine_kwargs
@@ -141,6 +143,7 @@ class HttpSensor(BaseSensorOperator):
data=self.request_params,
headers=self.headers,
extra_options=self.extra_options,
+ **self.request_kwargs,
)
if self.response_check: