This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 31d4a4a5 refactor(rest): switch HttpClient to use connection pool
(#530)
31d4a4a5 is described below
commit 31d4a4a5808b5868f2636d96e6025b6dea16e2bd
Author: Feiyang Li <[email protected]>
AuthorDate: Mon Jan 26 22:48:09 2026 +0800
refactor(rest): switch HttpClient to use connection pool (#530)
---
src/iceberg/catalog/rest/http_client.cc | 97 +++++++--------------------------
src/iceberg/catalog/rest/http_client.h | 14 +----
2 files changed, 23 insertions(+), 88 deletions(-)
diff --git a/src/iceberg/catalog/rest/http_client.cc
b/src/iceberg/catalog/rest/http_client.cc
index 84d458b9..41be14ce 100644
--- a/src/iceberg/catalog/rest/http_client.cc
+++ b/src/iceberg/catalog/rest/http_client.cc
@@ -134,41 +134,9 @@ Status HandleFailureResponse(const cpr::Response& response,
} // namespace
-void HttpClient::PrepareSession(
- const std::string& path, HttpMethod method,
- const std::unordered_map<std::string, std::string>& params,
- const std::unordered_map<std::string, std::string>& headers) {
- session_->SetUrl(cpr::Url{path});
- session_->SetParameters(GetParameters(params));
- session_->RemoveContent();
- // clear lingering POST mode state from prior requests. CURLOPT_POST is
implicitly set
- // to 1 by POST requests, and this state is not reset by RemoveContent(), so
we must
- // manually enforce HTTP GET to clear it.
- curl_easy_setopt(session_->GetCurlHolder()->handle, CURLOPT_HTTPGET, 1L);
- switch (method) {
- case HttpMethod::kGet:
- session_->PrepareGet();
- break;
- case HttpMethod::kPost:
- session_->PreparePost();
- break;
- case HttpMethod::kPut:
- session_->PreparePut();
- break;
- case HttpMethod::kDelete:
- session_->PrepareDelete();
- break;
- case HttpMethod::kHead:
- session_->PrepareHead();
- break;
- }
- auto final_headers = MergeHeaders(default_headers_, headers);
- session_->SetHeader(final_headers);
-}
-
HttpClient::HttpClient(std::unordered_map<std::string, std::string>
default_headers)
: default_headers_{std::move(default_headers)},
- session_{std::make_unique<cpr::Session>()} {
+ connection_pool_{std::make_unique<cpr::ConnectionPool>()} {
// Set default Content-Type for all requests (including GET/HEAD/DELETE).
// Many systems require that content type is set regardless and will fail,
// even on an empty bodied request.
@@ -182,12 +150,9 @@ Result<HttpResponse> HttpClient::Get(
const std::string& path, const std::unordered_map<std::string,
std::string>& params,
const std::unordered_map<std::string, std::string>& headers,
const ErrorHandler& error_handler) {
- cpr::Response response;
- {
- std::lock_guard guard(session_mutex_);
- PrepareSession(path, HttpMethod::kGet, params, headers);
- response = session_->Get();
- }
+ auto final_headers = MergeHeaders(default_headers_, headers);
+ cpr::Response response =
+ cpr::Get(cpr::Url{path}, GetParameters(params), final_headers,
*connection_pool_);
ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
HttpResponse http_response;
@@ -199,13 +164,9 @@ Result<HttpResponse> HttpClient::Post(
const std::string& path, const std::string& body,
const std::unordered_map<std::string, std::string>& headers,
const ErrorHandler& error_handler) {
- cpr::Response response;
- {
- std::lock_guard guard(session_mutex_);
- PrepareSession(path, HttpMethod::kPost, /*params=*/{}, headers);
- session_->SetBody(cpr::Body{body});
- response = session_->Post();
- }
+ auto final_headers = MergeHeaders(default_headers_, headers);
+ cpr::Response response =
+ cpr::Post(cpr::Url{path}, cpr::Body{body}, final_headers,
*connection_pool_);
ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
HttpResponse http_response;
@@ -218,25 +179,16 @@ Result<HttpResponse> HttpClient::PostForm(
const std::unordered_map<std::string, std::string>& form_data,
const std::unordered_map<std::string, std::string>& headers,
const ErrorHandler& error_handler) {
- cpr::Response response;
-
- {
- std::lock_guard guard(session_mutex_);
-
- // Override default Content-Type (application/json) with form-urlencoded
- auto form_headers = headers;
- form_headers[kHeaderContentType] = kMimeTypeFormUrlEncoded;
-
- PrepareSession(path, HttpMethod::kPost, /*params=*/{}, form_headers);
- std::vector<cpr::Pair> pair_list;
- pair_list.reserve(form_data.size());
- for (const auto& [key, val] : form_data) {
- pair_list.emplace_back(key, val);
- }
- session_->SetPayload(cpr::Payload(pair_list.begin(), pair_list.end()));
-
- response = session_->Post();
+ auto final_headers = MergeHeaders(default_headers_, headers);
+ final_headers.insert_or_assign(kHeaderContentType, kMimeTypeFormUrlEncoded);
+ std::vector<cpr::Pair> pair_list;
+ pair_list.reserve(form_data.size());
+ for (const auto& [key, val] : form_data) {
+ pair_list.emplace_back(key, val);
}
+ cpr::Response response =
+ cpr::Post(cpr::Url{path}, cpr::Payload(pair_list.begin(),
pair_list.end()),
+ final_headers, *connection_pool_);
ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
HttpResponse http_response;
@@ -247,12 +199,8 @@ Result<HttpResponse> HttpClient::PostForm(
Result<HttpResponse> HttpClient::Head(
const std::string& path, const std::unordered_map<std::string,
std::string>& headers,
const ErrorHandler& error_handler) {
- cpr::Response response;
- {
- std::lock_guard guard(session_mutex_);
- PrepareSession(path, HttpMethod::kHead, /*params=*/{}, headers);
- response = session_->Head();
- }
+ auto final_headers = MergeHeaders(default_headers_, headers);
+ cpr::Response response = cpr::Head(cpr::Url{path}, final_headers,
*connection_pool_);
ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
HttpResponse http_response;
@@ -264,12 +212,9 @@ Result<HttpResponse> HttpClient::Delete(
const std::string& path, const std::unordered_map<std::string,
std::string>& params,
const std::unordered_map<std::string, std::string>& headers,
const ErrorHandler& error_handler) {
- cpr::Response response;
- {
- std::lock_guard guard(session_mutex_);
- PrepareSession(path, HttpMethod::kDelete, params, headers);
- response = session_->Delete();
- }
+ auto final_headers = MergeHeaders(default_headers_, headers);
+ cpr::Response response = cpr::Delete(cpr::Url{path}, GetParameters(params),
+ final_headers, *connection_pool_);
ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
HttpResponse http_response;
diff --git a/src/iceberg/catalog/rest/http_client.h
b/src/iceberg/catalog/rest/http_client.h
index 84f8e590..38f902e4 100644
--- a/src/iceberg/catalog/rest/http_client.h
+++ b/src/iceberg/catalog/rest/http_client.h
@@ -21,11 +21,9 @@
#include <cstdint>
#include <memory>
-#include <mutex>
#include <string>
#include <unordered_map>
-#include "iceberg/catalog/rest/endpoint.h"
#include "iceberg/catalog/rest/iceberg_rest_export.h"
#include "iceberg/catalog/rest/type_fwd.h"
#include "iceberg/result.h"
@@ -34,7 +32,7 @@
/// \brief Http client for Iceberg REST API.
namespace cpr {
-class Session;
+class ConnectionPool;
} // namespace cpr
namespace iceberg::rest {
@@ -110,16 +108,8 @@ class ICEBERG_REST_EXPORT HttpClient {
const ErrorHandler& error_handler);
private:
- void PrepareSession(const std::string& path, HttpMethod method,
- const std::unordered_map<std::string, std::string>&
params,
- const std::unordered_map<std::string, std::string>&
headers);
-
std::unordered_map<std::string, std::string> default_headers_;
-
- // TODO(Li Feiyang): use connection pool to support external multi-threaded
concurrent
- // calls
- std::unique_ptr<cpr::Session> session_;
- mutable std::mutex session_mutex_;
+ std::unique_ptr<cpr::ConnectionPool> connection_pool_;
};
} // namespace iceberg::rest