This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 72d3831e feat(rest): add initial oauth2 support to rest catalog (#577)
72d3831e is described below

commit 72d3831e7be25b86f04c508d70c3e17029e77d68
Author: lishuxu <[email protected]>
AuthorDate: Thu Mar 19 11:35:37 2026 +0800

    feat(rest): add initial oauth2 support to rest catalog (#577)
    
    Add OAuth2 authentication support for the REST catalog, including:
    - OAuth2Manager with static token and client_credentials grant flows
    
    TODO:
    - RefreshToken and ExchangeToken will be supported later
---
 src/iceberg/catalog/rest/CMakeLists.txt            |   2 +
 src/iceberg/catalog/rest/auth/auth_manager.cc      |  78 +++++++++-
 .../catalog/rest/auth/auth_manager_internal.h      |   5 +
 src/iceberg/catalog/rest/auth/auth_managers.cc     |   5 +-
 src/iceberg/catalog/rest/auth/auth_properties.cc   |  83 +++++++++++
 src/iceberg/catalog/rest/auth/auth_properties.h    |  93 ++++++++----
 src/iceberg/catalog/rest/auth/auth_session.cc      |  11 ++
 src/iceberg/catalog/rest/auth/auth_session.h       |  22 +++
 src/iceberg/catalog/rest/auth/oauth2_util.cc       |  77 ++++++++++
 src/iceberg/catalog/rest/auth/oauth2_util.h        |  56 +++++++
 src/iceberg/catalog/rest/http_client.cc            |   2 +-
 src/iceberg/catalog/rest/json_serde.cc             |  45 ++++++
 src/iceberg/catalog/rest/json_serde_internal.h     |   1 +
 src/iceberg/catalog/rest/meson.build               |   3 +
 src/iceberg/catalog/rest/rest_catalog.cc           |  13 +-
 src/iceberg/catalog/rest/type_fwd.h                |   3 +
 src/iceberg/catalog/rest/types.cc                  |  19 +++
 src/iceberg/catalog/rest/types.h                   |  17 +++
 src/iceberg/test/auth_manager_test.cc              | 163 +++++++++++++++++++++
 19 files changed, 663 insertions(+), 35 deletions(-)

diff --git a/src/iceberg/catalog/rest/CMakeLists.txt 
b/src/iceberg/catalog/rest/CMakeLists.txt
index 1da47d68..e91b1296 100644
--- a/src/iceberg/catalog/rest/CMakeLists.txt
+++ b/src/iceberg/catalog/rest/CMakeLists.txt
@@ -20,7 +20,9 @@ add_subdirectory(auth)
 set(ICEBERG_REST_SOURCES
     auth/auth_manager.cc
     auth/auth_managers.cc
+    auth/auth_properties.cc
     auth/auth_session.cc
+    auth/oauth2_util.cc
     catalog_properties.cc
     endpoint.cc
     error_handlers.cc
diff --git a/src/iceberg/catalog/rest/auth/auth_manager.cc 
b/src/iceberg/catalog/rest/auth/auth_manager.cc
index 14946aef..47370bd3 100644
--- a/src/iceberg/catalog/rest/auth/auth_manager.cc
+++ b/src/iceberg/catalog/rest/auth/auth_manager.cc
@@ -19,9 +19,12 @@
 
 #include "iceberg/catalog/rest/auth/auth_manager.h"
 
+#include <optional>
+
 #include "iceberg/catalog/rest/auth/auth_manager_internal.h"
 #include "iceberg/catalog/rest/auth/auth_properties.h"
 #include "iceberg/catalog/rest/auth/auth_session.h"
+#include "iceberg/catalog/rest/auth/oauth2_util.h"
 #include "iceberg/util/macros.h"
 #include "iceberg/util/transform_util.h"
 
@@ -80,7 +83,8 @@ class BasicAuthManager : public AuthManager {
                      "Missing required property '{}'", 
AuthProperties::kBasicPassword);
     std::string credential = username_it->second + ":" + password_it->second;
     return AuthSession::MakeDefault(
-        {{"Authorization", "Basic " + 
TransformUtil::Base64Encode(credential)}});
+        {{std::string(kAuthorizationHeader),
+          "Basic " + TransformUtil::Base64Encode(credential)}});
   }
 };
 
@@ -90,4 +94,76 @@ Result<std::unique_ptr<AuthManager>> MakeBasicAuthManager(
   return std::make_unique<BasicAuthManager>();
 }
 
+/// \brief OAuth2 authentication manager.
+class OAuth2Manager : public AuthManager {
+ public:
+  Result<std::shared_ptr<AuthSession>> InitSession(
+      HttpClient& init_client,
+      const std::unordered_map<std::string, std::string>& properties) override 
{
+    ICEBERG_ASSIGN_OR_RAISE(auto config, 
AuthProperties::FromProperties(properties));
+    // No token refresh during init (short-lived session).
+    config.Set(AuthProperties::kKeepRefreshed, false);
+
+    // Credential takes priority: fetch a fresh token for the config request.
+    if (!config.credential().empty()) {
+      auto init_session = 
AuthSession::MakeDefault(AuthHeaders(config.token()));
+      ICEBERG_ASSIGN_OR_RAISE(init_token_response_,
+                              FetchToken(init_client, *init_session, config));
+      return 
AuthSession::MakeDefault(AuthHeaders(init_token_response_->access_token));
+    }
+
+    if (!config.token().empty()) {
+      return AuthSession::MakeDefault(AuthHeaders(config.token()));
+    }
+
+    return AuthSession::MakeDefault({});
+  }
+
+  Result<std::shared_ptr<AuthSession>> CatalogSession(
+      HttpClient& client,
+      const std::unordered_map<std::string, std::string>& properties) override 
{
+    ICEBERG_ASSIGN_OR_RAISE(auto config, 
AuthProperties::FromProperties(properties));
+
+    // Reuse token from init phase.
+    if (init_token_response_.has_value()) {
+      auto token_response = std::move(*init_token_response_);
+      init_token_response_.reset();
+      return AuthSession::MakeOAuth2(token_response, 
config.oauth2_server_uri(),
+                                     config.client_id(), 
config.client_secret(),
+                                     config.scope(), client);
+    }
+
+    // If token is provided, use it directly.
+    if (!config.token().empty()) {
+      return AuthSession::MakeDefault(AuthHeaders(config.token()));
+    }
+
+    // Fetch a new token using client_credentials grant.
+    if (!config.credential().empty()) {
+      auto base_session = 
AuthSession::MakeDefault(AuthHeaders(config.token()));
+      OAuthTokenResponse token_response;
+      ICEBERG_ASSIGN_OR_RAISE(token_response, FetchToken(client, 
*base_session, config));
+      // TODO(lishuxu): should we directly pass config to the MakeOAuth2 call?
+      return AuthSession::MakeOAuth2(token_response, 
config.oauth2_server_uri(),
+                                     config.client_id(), 
config.client_secret(),
+                                     config.scope(), client);
+    }
+
+    return AuthSession::MakeDefault({});
+  }
+
+  // TODO(lishuxu): Override TableSession() for token exchange (RFC 8693).
+  // TODO(lishuxu): Override ContextualSession() for per-context exchange.
+
+ private:
+  /// Cached token from InitSession
+  std::optional<OAuthTokenResponse> init_token_response_;
+};
+
+Result<std::unique_ptr<AuthManager>> MakeOAuth2Manager(
+    [[maybe_unused]] std::string_view name,
+    [[maybe_unused]] const std::unordered_map<std::string, std::string>& 
properties) {
+  return std::make_unique<OAuth2Manager>();
+}
+
 }  // namespace iceberg::rest::auth
diff --git a/src/iceberg/catalog/rest/auth/auth_manager_internal.h 
b/src/iceberg/catalog/rest/auth/auth_manager_internal.h
index 96e45239..051d0550 100644
--- a/src/iceberg/catalog/rest/auth/auth_manager_internal.h
+++ b/src/iceberg/catalog/rest/auth/auth_manager_internal.h
@@ -42,4 +42,9 @@ Result<std::unique_ptr<AuthManager>> MakeBasicAuthManager(
     std::string_view name,
     const std::unordered_map<std::string, std::string>& properties);
 
+/// \brief Create an OAuth2 authentication manager.
+Result<std::unique_ptr<AuthManager>> MakeOAuth2Manager(
+    std::string_view name,
+    const std::unordered_map<std::string, std::string>& properties);
+
 }  // namespace iceberg::rest::auth
diff --git a/src/iceberg/catalog/rest/auth/auth_managers.cc 
b/src/iceberg/catalog/rest/auth/auth_managers.cc
index d0bf2484..f55885d7 100644
--- a/src/iceberg/catalog/rest/auth/auth_managers.cc
+++ b/src/iceberg/catalog/rest/auth/auth_managers.cc
@@ -52,8 +52,8 @@ std::string InferAuthType(
   }
 
   // Infer from OAuth2 properties (credential or token)
-  bool has_credential = properties.contains(AuthProperties::kOAuth2Credential);
-  bool has_token = properties.contains(AuthProperties::kOAuth2Token);
+  bool has_credential = properties.contains(AuthProperties::kCredential.key());
+  bool has_token = properties.contains(AuthProperties::kToken.key());
   if (has_credential || has_token) {
     return AuthProperties::kAuthTypeOAuth2;
   }
@@ -65,6 +65,7 @@ AuthManagerRegistry CreateDefaultRegistry() {
   return {
       {AuthProperties::kAuthTypeNone, MakeNoopAuthManager},
       {AuthProperties::kAuthTypeBasic, MakeBasicAuthManager},
+      {AuthProperties::kAuthTypeOAuth2, MakeOAuth2Manager},
   };
 }
 
diff --git a/src/iceberg/catalog/rest/auth/auth_properties.cc 
b/src/iceberg/catalog/rest/auth/auth_properties.cc
new file mode 100644
index 00000000..dcf16782
--- /dev/null
+++ b/src/iceberg/catalog/rest/auth/auth_properties.cc
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/catalog/rest/auth/auth_properties.h"
+
+#include <utility>
+
+#include "iceberg/catalog/rest/catalog_properties.h"
+
+namespace iceberg::rest::auth {
+
+namespace {
+
+std::pair<std::string, std::string> ParseCredential(const std::string& 
credential) {
+  auto colon_pos = credential.find(':');
+  if (colon_pos == std::string::npos) {
+    return {"", credential};
+  }
+  return {credential.substr(0, colon_pos), credential.substr(colon_pos + 1)};
+}
+
+}  // namespace
+
+std::unordered_map<std::string, std::string> 
AuthProperties::optional_oauth_params()
+    const {
+  std::unordered_map<std::string, std::string> params;
+  if (auto audience = Get(kAudience); !audience.empty()) {
+    params.emplace(kAudience.key(), std::move(audience));
+  }
+  if (auto resource = Get(kResource); !resource.empty()) {
+    params.emplace(kResource.key(), std::move(resource));
+  }
+  return params;
+}
+
+Result<AuthProperties> AuthProperties::FromProperties(
+    const std::unordered_map<std::string, std::string>& properties) {
+  AuthProperties config;
+  config.configs_ = properties;
+
+  // Parse client_id/client_secret from credential
+  if (auto cred = config.credential(); !cred.empty()) {
+    auto [id, secret] = ParseCredential(cred);
+    config.client_id_ = std::move(id);
+    config.client_secret_ = std::move(secret);
+  }
+
+  // Resolve token endpoint: if not explicitly set, derive from catalog URI
+  if (properties.find(kOAuth2ServerUri.key()) == properties.end() ||
+      properties.at(kOAuth2ServerUri.key()).empty()) {
+    auto uri_it = properties.find(RestCatalogProperties::kUri.key());
+    if (uri_it != properties.end() && !uri_it->second.empty()) {
+      std::string_view base = uri_it->second;
+      while (!base.empty() && base.back() == '/') {
+        base.remove_suffix(1);
+      }
+      config.Set(kOAuth2ServerUri,
+                 std::string(base) + "/" + 
std::string(kOAuth2ServerUri.value()));
+    }
+  }
+
+  // TODO(lishuxu): Parse JWT exp claim from token to set expires_at_millis_.
+
+  return config;
+}
+
+}  // namespace iceberg::rest::auth
diff --git a/src/iceberg/catalog/rest/auth/auth_properties.h 
b/src/iceberg/catalog/rest/auth/auth_properties.h
index e14b7fcf..05a7ea2c 100644
--- a/src/iceberg/catalog/rest/auth/auth_properties.h
+++ b/src/iceberg/catalog/rest/auth/auth_properties.h
@@ -19,55 +19,88 @@
 
 #pragma once
 
+#include <cstdint>
+#include <optional>
 #include <string>
-#include <string_view>
+#include <unordered_map>
+
+#include "iceberg/catalog/rest/iceberg_rest_export.h"
+#include "iceberg/result.h"
+#include "iceberg/util/config.h"
 
 /// \file iceberg/catalog/rest/auth/auth_properties.h
-/// \brief Property keys and constants for REST catalog authentication.
+/// \brief Property keys and configuration for REST catalog authentication.
 
 namespace iceberg::rest::auth {
 
-/// \brief Property keys and constants for authentication configuration.
-///
-/// This struct defines all the property keys used to configure authentication
-/// for the REST catalog. It follows the same naming conventions as Java 
Iceberg.
-struct AuthProperties {
-  /// \brief Property key for specifying the authentication type.
+/// \brief Authentication properties
+class ICEBERG_REST_EXPORT AuthProperties : public ConfigBase<AuthProperties> {
+ public:
+  template <typename T>
+  using Entry = const ConfigBase<AuthProperties>::Entry<T>;
+
+  // ---- Authentication type constants (not Entry-based) ----
+
   inline static const std::string kAuthType = "rest.auth.type";
-  /// \brief Authentication type: no authentication.
   inline static const std::string kAuthTypeNone = "none";
-  /// \brief Authentication type: HTTP Basic authentication.
   inline static const std::string kAuthTypeBasic = "basic";
-  /// \brief Authentication type: OAuth2 authentication.
   inline static const std::string kAuthTypeOAuth2 = "oauth2";
-  /// \brief Authentication type: AWS SigV4 authentication.
   inline static const std::string kAuthTypeSigV4 = "sigv4";
 
-  /// \brief Property key for Basic auth username.
+  // ---- Basic auth entries ----
+
   inline static const std::string kBasicUsername = "rest.auth.basic.username";
-  /// \brief Property key for Basic auth password.
   inline static const std::string kBasicPassword = "rest.auth.basic.password";
 
-  /// \brief Property key for OAuth2 token (bearer token).
-  inline static const std::string kOAuth2Token = "token";
-  /// \brief Property key for OAuth2 credential (client_id:client_secret).
-  inline static const std::string kOAuth2Credential = "credential";
-  /// \brief Property key for OAuth2 scope.
-  inline static const std::string kOAuth2Scope = "scope";
-  /// \brief Property key for OAuth2 server URI.
-  inline static const std::string kOAuth2ServerUri = "oauth2-server-uri";
-  /// \brief Property key for enabling token refresh.
-  inline static const std::string kOAuth2TokenRefreshEnabled = 
"token-refresh-enabled";
-  /// \brief Default OAuth2 scope for catalog operations.
-  inline static const std::string kOAuth2DefaultScope = "catalog";
-
-  /// \brief Property key for SigV4 region.
+  // ---- SigV4 entries ----
+
   inline static const std::string kSigV4Region = "rest.auth.sigv4.region";
-  /// \brief Property key for SigV4 service name.
   inline static const std::string kSigV4Service = "rest.auth.sigv4.service";
-  /// \brief Property key for SigV4 delegate auth type.
   inline static const std::string kSigV4DelegateAuthType =
       "rest.auth.sigv4.delegate-auth-type";
+
+  // ---- OAuth2 entries ----
+
+  inline static Entry<std::string> kToken{"token", ""};
+  inline static Entry<std::string> kCredential{"credential", ""};
+  inline static Entry<std::string> kScope{"scope", "catalog"};
+  inline static Entry<std::string> kOAuth2ServerUri{"oauth2-server-uri",
+                                                    "v1/oauth/tokens"};
+  inline static Entry<bool> kKeepRefreshed{"token-refresh-enabled", true};
+  inline static Entry<bool> kExchangeEnabled{"token-exchange-enabled", true};
+  inline static Entry<std::string> kAudience{"audience", ""};
+  inline static Entry<std::string> kResource{"resource", ""};
+
+  /// \brief Build an AuthProperties from a properties map.
+  static Result<AuthProperties> FromProperties(
+      const std::unordered_map<std::string, std::string>& properties);
+
+  /// \brief Get the bearer token.
+  std::string token() const { return Get(kToken); }
+  /// \brief Get the raw credential string.
+  std::string credential() const { return Get(kCredential); }
+  /// \brief Get the OAuth2 scope.
+  std::string scope() const { return Get(kScope); }
+  /// \brief Get the token endpoint URI.
+  std::string oauth2_server_uri() const { return Get(kOAuth2ServerUri); }
+  /// \brief Whether token refresh is enabled.
+  bool keep_refreshed() const { return Get(kKeepRefreshed); }
+  /// \brief Whether token exchange is enabled.
+  bool exchange_enabled() const { return Get(kExchangeEnabled); }
+
+  /// \brief Parsed client_id from credential (empty if no colon).
+  const std::string& client_id() const { return client_id_; }
+  /// \brief Parsed client_secret from credential.
+  const std::string& client_secret() const { return client_secret_; }
+
+  /// \brief Build optional OAuth params (audience, resource) from config.
+  std::unordered_map<std::string, std::string> optional_oauth_params() const;
+
+ private:
+  std::string client_id_;
+  std::string client_secret_;
+  std::string token_type_;
+  std::optional<int64_t> expires_at_millis_;
 };
 
 }  // namespace iceberg::rest::auth
diff --git a/src/iceberg/catalog/rest/auth/auth_session.cc 
b/src/iceberg/catalog/rest/auth/auth_session.cc
index 00ed946a..7251dc4a 100644
--- a/src/iceberg/catalog/rest/auth/auth_session.cc
+++ b/src/iceberg/catalog/rest/auth/auth_session.cc
@@ -21,6 +21,8 @@
 
 #include <utility>
 
+#include "iceberg/catalog/rest/auth/oauth2_util.h"
+
 namespace iceberg::rest::auth {
 
 namespace {
@@ -49,4 +51,13 @@ std::shared_ptr<AuthSession> AuthSession::MakeDefault(
   return std::make_shared<DefaultAuthSession>(std::move(headers));
 }
 
+std::shared_ptr<AuthSession> AuthSession::MakeOAuth2(
+    const OAuthTokenResponse& initial_token, const std::string& 
/*token_endpoint*/,
+    const std::string& /*client_id*/, const std::string& /*client_secret*/,
+    const std::string& /*scope*/, HttpClient& /*client*/) {
+  // TODO(lishuxu): Create OAuth2AuthSession with auto-refresh support.
+  return MakeDefault({{std::string(kAuthorizationHeader),
+                       std::string(kBearerPrefix) + 
initial_token.access_token}});
+}
+
 }  // namespace iceberg::rest::auth
diff --git a/src/iceberg/catalog/rest/auth/auth_session.h 
b/src/iceberg/catalog/rest/auth/auth_session.h
index d81b3d93..26b93877 100644
--- a/src/iceberg/catalog/rest/auth/auth_session.h
+++ b/src/iceberg/catalog/rest/auth/auth_session.h
@@ -24,6 +24,7 @@
 #include <unordered_map>
 
 #include "iceberg/catalog/rest/iceberg_rest_export.h"
+#include "iceberg/catalog/rest/type_fwd.h"
 #include "iceberg/result.h"
 
 /// \file iceberg/catalog/rest/auth/auth_session.h
@@ -70,6 +71,27 @@ class ICEBERG_REST_EXPORT AuthSession {
   /// \return A new session that adds the given headers to requests.
   static std::shared_ptr<AuthSession> MakeDefault(
       std::unordered_map<std::string, std::string> headers);
+
+  /// \brief Create an OAuth2 session with automatic token refresh.
+  ///
+  /// This factory method creates a session that holds an access token and
+  /// optionally a refresh token. When Authenticate() is called and the token
+  /// is expired, it transparently refreshes the token before setting the
+  /// Authorization header.
+  ///
+  /// \param initial_token The initial token response from FetchToken().
+  /// \param token_endpoint Full URL of the OAuth2 token endpoint for refresh.
+  /// \param client_id OAuth2 client ID for refresh requests.
+  /// \param client_secret OAuth2 client secret for re-fetch if refresh fails.
+  /// \param scope OAuth2 scope for refresh requests.
+  /// \param client HTTP client for making refresh requests.
+  /// \return A new session that manages token lifecycle automatically.
+  static std::shared_ptr<AuthSession> MakeOAuth2(const OAuthTokenResponse& 
initial_token,
+                                                 const std::string& 
token_endpoint,
+                                                 const std::string& client_id,
+                                                 const std::string& 
client_secret,
+                                                 const std::string& scope,
+                                                 HttpClient& client);
 };
 
 }  // namespace iceberg::rest::auth
diff --git a/src/iceberg/catalog/rest/auth/oauth2_util.cc 
b/src/iceberg/catalog/rest/auth/oauth2_util.cc
new file mode 100644
index 00000000..3d209d2b
--- /dev/null
+++ b/src/iceberg/catalog/rest/auth/oauth2_util.cc
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/catalog/rest/auth/oauth2_util.h"
+
+#include <utility>
+
+#include <nlohmann/json.hpp>
+
+#include "iceberg/catalog/rest/auth/auth_session.h"
+#include "iceberg/catalog/rest/error_handlers.h"
+#include "iceberg/catalog/rest/http_client.h"
+#include "iceberg/catalog/rest/json_serde_internal.h"
+#include "iceberg/json_serde_internal.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg::rest::auth {
+
+namespace {
+
+constexpr std::string_view kGrantType = "grant_type";
+constexpr std::string_view kClientCredentials = "client_credentials";
+constexpr std::string_view kClientId = "client_id";
+constexpr std::string_view kClientSecret = "client_secret";
+constexpr std::string_view kScope = "scope";
+
+}  // namespace
+
+std::unordered_map<std::string, std::string> AuthHeaders(const std::string& 
token) {
+  if (!token.empty()) {
+    return {{std::string(kAuthorizationHeader), std::string(kBearerPrefix) + 
token}};
+  }
+  return {};
+}
+
+Result<OAuthTokenResponse> FetchToken(HttpClient& client, AuthSession& session,
+                                      const AuthProperties& properties) {
+  std::unordered_map<std::string, std::string> form_data{
+      {std::string(kGrantType), std::string(kClientCredentials)},
+      {std::string(kClientSecret), properties.client_secret()},
+      {std::string(kScope), properties.scope()},
+  };
+  if (!properties.client_id().empty()) {
+    form_data.emplace(std::string(kClientId), properties.client_id());
+  }
+  for (const auto& [key, value] : properties.optional_oauth_params()) {
+    form_data.emplace(key, value);
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto response,
+      client.PostForm(properties.oauth2_server_uri(), form_data,
+                      /*headers=*/{}, *DefaultErrorHandler::Instance(), 
session));
+
+  ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
+  ICEBERG_ASSIGN_OR_RAISE(auto token_response, 
FromJson<OAuthTokenResponse>(json));
+  ICEBERG_RETURN_UNEXPECTED(token_response.Validate());
+  return token_response;
+}
+
+}  // namespace iceberg::rest::auth
diff --git a/src/iceberg/catalog/rest/auth/oauth2_util.h 
b/src/iceberg/catalog/rest/auth/oauth2_util.h
new file mode 100644
index 00000000..39dd1296
--- /dev/null
+++ b/src/iceberg/catalog/rest/auth/oauth2_util.h
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <string_view>
+#include <unordered_map>
+
+#include "iceberg/catalog/rest/iceberg_rest_export.h"
+#include "iceberg/catalog/rest/type_fwd.h"
+#include "iceberg/catalog/rest/types.h"
+#include "iceberg/result.h"
+
+/// \file iceberg/catalog/rest/auth/oauth2_util.h
+/// \brief OAuth2 token utilities for REST catalog authentication.
+
+namespace iceberg::rest::auth {
+
+inline constexpr std::string_view kAuthorizationHeader = "Authorization";
+inline constexpr std::string_view kBearerPrefix = "Bearer ";
+
+/// \brief Fetch an OAuth2 token using the client_credentials grant type.
+///
+/// \param client HTTP client to use for the request.
+/// \param session Auth session for the request headers.
+/// \param properties Auth configuration containing credential, scope,
+///        token endpoint, and optional OAuth params.
+/// \return The token response or an error.
+ICEBERG_REST_EXPORT Result<OAuthTokenResponse> FetchToken(
+    HttpClient& client, AuthSession& session, const AuthProperties& 
properties);
+
+/// \brief Build auth headers from a token string.
+///
+/// \param token Bearer token string (may be empty).
+/// \return Headers map with Authorization header if token is non-empty.
+ICEBERG_REST_EXPORT std::unordered_map<std::string, std::string> AuthHeaders(
+    const std::string& token);
+
+}  // namespace iceberg::rest::auth
diff --git a/src/iceberg/catalog/rest/http_client.cc 
b/src/iceberg/catalog/rest/http_client.cc
index b0824621..2e383b0a 100644
--- a/src/iceberg/catalog/rest/http_client.cc
+++ b/src/iceberg/catalog/rest/http_client.cc
@@ -75,7 +75,7 @@ Result<cpr::Header> BuildHeaders(
     auth::AuthSession& session) {
   std::unordered_map<std::string, std::string> headers(default_headers);
   for (const auto& [key, val] : request_headers) {
-    headers.emplace(key, val);
+    headers.insert_or_assign(key, val);
   }
   ICEBERG_RETURN_UNEXPECTED(session.Authenticate(headers));
   return cpr::Header(headers.begin(), headers.end());
diff --git a/src/iceberg/catalog/rest/json_serde.cc 
b/src/iceberg/catalog/rest/json_serde.cc
index 727881f6..eebdc196 100644
--- a/src/iceberg/catalog/rest/json_serde.cc
+++ b/src/iceberg/catalog/rest/json_serde.cc
@@ -72,6 +72,12 @@ constexpr std::string_view kStack = "stack";
 constexpr std::string_view kError = "error";
 constexpr std::string_view kIdentifier = "identifier";
 constexpr std::string_view kRequirements = "requirements";
+constexpr std::string_view kAccessToken = "access_token";
+constexpr std::string_view kTokenType = "token_type";
+constexpr std::string_view kExpiresIn = "expires_in";
+constexpr std::string_view kIssuedTokenType = "issued_token_type";
+constexpr std::string_view kRefreshToken = "refresh_token";
+constexpr std::string_view kOAuthScope = "scope";
 
 }  // namespace
 
@@ -462,6 +468,44 @@ Result<CommitTableResponse> 
CommitTableResponseFromJson(const nlohmann::json& js
   return response;
 }
 
+nlohmann::json ToJson(const OAuthTokenResponse& response) {
+  nlohmann::json json;
+  json[kAccessToken] = response.access_token;
+  json[kTokenType] = response.token_type;
+  if (response.expires_in_secs.has_value()) {
+    json[kExpiresIn] = response.expires_in_secs.value();
+  }
+  if (!response.issued_token_type.empty()) {
+    json[kIssuedTokenType] = response.issued_token_type;
+  }
+  if (!response.scope.empty()) {
+    json[kOAuthScope] = response.scope;
+  }
+  return json;
+}
+
+Result<OAuthTokenResponse> OAuthTokenResponseFromJson(const nlohmann::json& 
json) {
+  OAuthTokenResponse response;
+  ICEBERG_ASSIGN_OR_RAISE(response.access_token,
+                          GetJsonValue<std::string>(json, kAccessToken));
+  ICEBERG_ASSIGN_OR_RAISE(response.token_type,
+                          GetJsonValue<std::string>(json, kTokenType));
+  // TODO(lishuxu): When implementing auto-refresh, extract exp claim
+  // from JWT if expires_in is missing.
+  if (json.contains(std::string(kExpiresIn))) {
+    ICEBERG_ASSIGN_OR_RAISE(auto val, GetJsonValue<int64_t>(json, kExpiresIn));
+    response.expires_in_secs = val;
+  }
+  ICEBERG_ASSIGN_OR_RAISE(response.issued_token_type,
+                          GetJsonValueOrDefault<std::string>(json, 
kIssuedTokenType));
+  ICEBERG_ASSIGN_OR_RAISE(response.refresh_token,
+                          GetJsonValueOrDefault<std::string>(json, 
kRefreshToken));
+  ICEBERG_ASSIGN_OR_RAISE(response.scope,
+                          GetJsonValueOrDefault<std::string>(json, 
kOAuthScope));
+  ICEBERG_RETURN_UNEXPECTED(response.Validate());
+  return response;
+}
+
 #define ICEBERG_DEFINE_FROM_JSON(Model)                       \
   template <>                                                 \
   Result<Model> FromJson<Model>(const nlohmann::json& json) { \
@@ -483,5 +527,6 @@ ICEBERG_DEFINE_FROM_JSON(RenameTableRequest)
 ICEBERG_DEFINE_FROM_JSON(CreateTableRequest)
 ICEBERG_DEFINE_FROM_JSON(CommitTableRequest)
 ICEBERG_DEFINE_FROM_JSON(CommitTableResponse)
+ICEBERG_DEFINE_FROM_JSON(OAuthTokenResponse)
 
 }  // namespace iceberg::rest
diff --git a/src/iceberg/catalog/rest/json_serde_internal.h 
b/src/iceberg/catalog/rest/json_serde_internal.h
index f8bdd78f..820e077d 100644
--- a/src/iceberg/catalog/rest/json_serde_internal.h
+++ b/src/iceberg/catalog/rest/json_serde_internal.h
@@ -58,6 +58,7 @@ ICEBERG_DECLARE_JSON_SERDE(RenameTableRequest)
 ICEBERG_DECLARE_JSON_SERDE(CreateTableRequest)
 ICEBERG_DECLARE_JSON_SERDE(CommitTableRequest)
 ICEBERG_DECLARE_JSON_SERDE(CommitTableResponse)
+ICEBERG_DECLARE_JSON_SERDE(OAuthTokenResponse)
 
 #undef ICEBERG_DECLARE_JSON_SERDE
 
diff --git a/src/iceberg/catalog/rest/meson.build 
b/src/iceberg/catalog/rest/meson.build
index 3a333963..ef250045 100644
--- a/src/iceberg/catalog/rest/meson.build
+++ b/src/iceberg/catalog/rest/meson.build
@@ -18,7 +18,9 @@
 iceberg_rest_sources = files(
     'auth/auth_manager.cc',
     'auth/auth_managers.cc',
+    'auth/auth_properties.cc',
     'auth/auth_session.cc',
+    'auth/oauth2_util.cc',
     'catalog_properties.cc',
     'endpoint.cc',
     'error_handlers.cc',
@@ -82,6 +84,7 @@ install_headers(
         'auth/auth_managers.h',
         'auth/auth_properties.h',
         'auth/auth_session.h',
+        'auth/oauth2_util.h',
     ],
     subdir: 'iceberg/catalog/rest/auth',
 )
diff --git a/src/iceberg/catalog/rest/rest_catalog.cc 
b/src/iceberg/catalog/rest/rest_catalog.cc
index 42dc8659..caef5041 100644
--- a/src/iceberg/catalog/rest/rest_catalog.cc
+++ b/src/iceberg/catalog/rest/rest_catalog.cc
@@ -71,8 +71,19 @@ Result<CatalogConfig> FetchServerConfig(const ResourcePaths& 
paths,
                                         auth::AuthSession& session) {
   ICEBERG_ASSIGN_OR_RAISE(auto config_path, paths.Config());
   HttpClient client(current_config.ExtractHeaders());
+
+  // Send the client's warehouse location to the service to keep in sync.
+  // This is needed for cases where the warehouse is configured client side, 
but may
+  // be used on the server side, like the Hive Metastore, where both client 
and service
+  // may have a warehouse location.
+  std::unordered_map<std::string, std::string> params;
+  std::string warehouse = 
current_config.Get(RestCatalogProperties::kWarehouse);
+  if (!warehouse.empty()) {
+    params[RestCatalogProperties::kWarehouse.key()] = std::move(warehouse);
+  }
+
   ICEBERG_ASSIGN_OR_RAISE(const auto response,
-                          client.Get(config_path, /*params=*/{}, 
/*headers=*/{},
+                          client.Get(config_path, params, /*headers=*/{},
                                      *DefaultErrorHandler::Instance(), 
session));
   ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
   return CatalogConfigFromJson(json);
diff --git a/src/iceberg/catalog/rest/type_fwd.h 
b/src/iceberg/catalog/rest/type_fwd.h
index e7286105..62bc14d1 100644
--- a/src/iceberg/catalog/rest/type_fwd.h
+++ b/src/iceberg/catalog/rest/type_fwd.h
@@ -22,10 +22,12 @@
 /// \file iceberg/catalog/rest/type_fwd.h
 /// Forward declarations and enum definitions for Iceberg REST API types.
 
+#include "iceberg/catalog/rest/auth/auth_properties.h"
 namespace iceberg::rest {
 
 struct ErrorResponse;
 struct LoadTableResult;
+struct OAuthTokenResponse;
 
 class Endpoint;
 class ErrorHandler;
@@ -39,6 +41,7 @@ class RestCatalogProperties;
 namespace iceberg::rest::auth {
 
 class AuthManager;
+class AuthProperties;
 class AuthSession;
 
 }  // namespace iceberg::rest::auth
diff --git a/src/iceberg/catalog/rest/types.cc 
b/src/iceberg/catalog/rest/types.cc
index 3416bfe3..3abfb140 100644
--- a/src/iceberg/catalog/rest/types.cc
+++ b/src/iceberg/catalog/rest/types.cc
@@ -19,6 +19,8 @@
 
 #include "iceberg/catalog/rest/types.h"
 
+#include <algorithm>
+
 #include "iceberg/partition_spec.h"
 #include "iceberg/schema.h"
 #include "iceberg/sort_order.h"
@@ -116,4 +118,21 @@ bool CommitTableResponse::operator==(const 
CommitTableResponse& other) const {
   return true;
 }
 
+Status OAuthTokenResponse::Validate() const {
+  if (access_token.empty()) {
+    return ValidationFailed("OAuth2 token response missing required 
'access_token'");
+  }
+  if (token_type.empty()) {
+    return ValidationFailed("OAuth2 token response missing required 
'token_type'");
+  }
+  // token_type must be "bearer" or "N_A" (case-insensitive).
+  std::string lower_type = token_type;
+  std::ranges::transform(lower_type, lower_type.begin(), ::tolower);
+  if (lower_type != "bearer" && lower_type != "n_a") {
+    return ValidationFailed(R"(Unsupported token type: {} (must be "bearer" or 
"N_A"))",
+                            token_type);
+  }
+  return {};
+}
+
 }  // namespace iceberg::rest
diff --git a/src/iceberg/catalog/rest/types.h b/src/iceberg/catalog/rest/types.h
index 93e7048a..6495a651 100644
--- a/src/iceberg/catalog/rest/types.h
+++ b/src/iceberg/catalog/rest/types.h
@@ -19,7 +19,9 @@
 
 #pragma once
 
+#include <cstdint>
 #include <memory>
+#include <optional>
 #include <string>
 #include <unordered_map>
 #include <vector>
@@ -278,4 +280,19 @@ struct ICEBERG_REST_EXPORT CommitTableResponse {
   bool operator==(const CommitTableResponse& other) const;
 };
 
+/// \brief Response from an OAuth2 token endpoint.
+struct ICEBERG_REST_EXPORT OAuthTokenResponse {
+  std::string access_token;                // required
+  std::string token_type;                  // required, "bearer" or "N_A"
+  std::optional<int64_t> expires_in_secs;  // optional, seconds until 
expiration
+  std::string issued_token_type;           // optional, for token exchange
+  std::string refresh_token;               // optional
+  std::string scope;                       // optional
+
+  /// \brief Validates the token response.
+  Status Validate() const;
+
+  bool operator==(const OAuthTokenResponse&) const = default;
+};
+
 }  // namespace iceberg::rest
diff --git a/src/iceberg/test/auth_manager_test.cc 
b/src/iceberg/test/auth_manager_test.cc
index 82db393d..bd06fee3 100644
--- a/src/iceberg/test/auth_manager_test.cc
+++ b/src/iceberg/test/auth_manager_test.cc
@@ -24,15 +24,29 @@
 
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
+#include <nlohmann/json.hpp>
 
 #include "iceberg/catalog/rest/auth/auth_managers.h"
 #include "iceberg/catalog/rest/auth/auth_properties.h"
 #include "iceberg/catalog/rest/auth/auth_session.h"
+#include "iceberg/catalog/rest/auth/oauth2_util.h"
 #include "iceberg/catalog/rest/http_client.h"
+#include "iceberg/catalog/rest/json_serde_internal.h"
+#include "iceberg/json_serde_internal.h"
 #include "iceberg/test/matchers.h"
 
 namespace iceberg::rest::auth {
 
+namespace {
+
+/// Helper to parse OAuthTokenResponse from a JSON string.
+Result<OAuthTokenResponse> ParseTokenResponse(const std::string& str) {
+  ICEBERG_ASSIGN_OR_RAISE(auto json, iceberg::FromJsonString(str));
+  return iceberg::rest::FromJson<OAuthTokenResponse>(json);
+}
+
+}  // namespace
+
 class AuthManagerTest : public ::testing::Test {
  protected:
   HttpClient client_{{}};
@@ -195,4 +209,153 @@ TEST_F(AuthManagerTest, RegisterCustomAuthManager) {
   EXPECT_EQ(headers["X-Custom-Auth"], "custom-value");
 }
 
+// Verifies OAuth2 with static token
+TEST_F(AuthManagerTest, OAuth2StaticToken) {
+  std::unordered_map<std::string, std::string> properties = {
+      {AuthProperties::kAuthType, "oauth2"},
+      {AuthProperties::kToken.key(), "my-static-token"},
+  };
+
+  auto manager_result = AuthManagers::Load("test-catalog", properties);
+  ASSERT_THAT(manager_result, IsOk());
+
+  auto session_result = manager_result.value()->CatalogSession(client_, 
properties);
+  ASSERT_THAT(session_result, IsOk());
+
+  std::unordered_map<std::string, std::string> headers;
+  EXPECT_THAT(session_result.value()->Authenticate(headers), IsOk());
+  EXPECT_EQ(headers["Authorization"], "Bearer my-static-token");
+}
+
+// Verifies OAuth2 type is inferred from token property
+TEST_F(AuthManagerTest, OAuth2InferredFromToken) {
+  std::unordered_map<std::string, std::string> properties = {
+      {AuthProperties::kToken.key(), "inferred-token"},
+  };
+
+  auto manager_result = AuthManagers::Load("test-catalog", properties);
+  ASSERT_THAT(manager_result, IsOk());
+
+  auto session_result = manager_result.value()->CatalogSession(client_, 
properties);
+  ASSERT_THAT(session_result, IsOk());
+
+  std::unordered_map<std::string, std::string> headers;
+  EXPECT_THAT(session_result.value()->Authenticate(headers), IsOk());
+  EXPECT_EQ(headers["Authorization"], "Bearer inferred-token");
+}
+
+// Verifies OAuth2 returns unauthenticated session when neither token nor 
credential is
+// provided
+TEST_F(AuthManagerTest, OAuth2MissingCredentials) {
+  std::unordered_map<std::string, std::string> properties = {
+      {AuthProperties::kAuthType, "oauth2"},
+  };
+
+  auto manager_result = AuthManagers::Load("test-catalog", properties);
+  ASSERT_THAT(manager_result, IsOk());
+
+  auto session_result = manager_result.value()->CatalogSession(client_, 
properties);
+  ASSERT_THAT(session_result, IsOk());
+
+  // Session should have no auth headers
+  std::unordered_map<std::string, std::string> headers;
+  ASSERT_TRUE(session_result.value()->Authenticate(headers).has_value());
+  EXPECT_EQ(headers.find("Authorization"), headers.end());
+}
+
+// Verifies that when both token and credential are provided, token takes 
priority
+// in CatalogSession (without a prior InitSession call)
+TEST_F(AuthManagerTest, OAuth2TokenTakesPriorityOverCredential) {
+  std::unordered_map<std::string, std::string> properties = {
+      {AuthProperties::kAuthType, "oauth2"},
+      {AuthProperties::kCredential.key(), "secret-only"},
+      {AuthProperties::kToken.key(), "my-static-token"},
+      {"uri", "http://localhost:8181"},
+  };
+
+  auto manager_result = AuthManagers::Load("test-catalog", properties);
+  ASSERT_THAT(manager_result, IsOk());
+
+  auto session_result = manager_result.value()->CatalogSession(client_, 
properties);
+  ASSERT_THAT(session_result, IsOk());
+
+  std::unordered_map<std::string, std::string> headers;
+  ASSERT_THAT(session_result.value()->Authenticate(headers), IsOk());
+  EXPECT_EQ(headers["Authorization"], "Bearer my-static-token");
+}
+
+// Verifies OAuthTokenResponse JSON parsing
+TEST_F(AuthManagerTest, OAuthTokenResponseParsing) {
+  std::string json = R"({
+    "access_token": "test-access-token",
+    "token_type": "bearer",
+    "expires_in": 3600,
+    "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
+    "refresh_token": "test-refresh-token",
+    "scope": "catalog"
+  })";
+
+  auto result = ParseTokenResponse(json);
+  ASSERT_THAT(result, IsOk());
+  EXPECT_EQ(result->access_token, "test-access-token");
+  EXPECT_EQ(result->token_type, "bearer");
+  ASSERT_TRUE(result->expires_in_secs.has_value());
+  EXPECT_EQ(result->expires_in_secs.value(), 3600);
+  EXPECT_EQ(result->issued_token_type, 
"urn:ietf:params:oauth:token-type:access_token");
+  EXPECT_EQ(result->refresh_token, "test-refresh-token");
+  EXPECT_EQ(result->scope, "catalog");
+}
+
+// Verifies OAuthTokenResponse parsing with minimal fields
+TEST_F(AuthManagerTest, OAuthTokenResponseMinimal) {
+  std::string json = R"({
+    "access_token": "token123",
+    "token_type": "Bearer"
+  })";
+
+  auto result = ParseTokenResponse(json);
+  ASSERT_THAT(result, IsOk());
+  EXPECT_EQ(result->access_token, "token123");
+  EXPECT_EQ(result->token_type, "Bearer");
+  EXPECT_FALSE(result->expires_in_secs.has_value());
+  EXPECT_TRUE(result->issued_token_type.empty());
+  EXPECT_TRUE(result->refresh_token.empty());
+  EXPECT_TRUE(result->scope.empty());
+}
+
+// Verifies OAuthTokenResponse validation fails when access_token is missing
+TEST_F(AuthManagerTest, OAuthTokenResponseMissingAccessToken) {
+  std::string json = R"({"token_type": "bearer"})";
+  auto result = ParseTokenResponse(json);
+  EXPECT_THAT(result, ::testing::Not(IsOk()));
+}
+
+// Verifies OAuthTokenResponse validation fails when token_type is missing
+TEST_F(AuthManagerTest, OAuthTokenResponseMissingTokenType) {
+  std::string json = R"({"access_token": "token123"})";
+  auto result = ParseTokenResponse(json);
+  EXPECT_THAT(result, ::testing::Not(IsOk()));
+}
+
+// Verifies OAuthTokenResponse validation fails for unsupported token_type
+TEST_F(AuthManagerTest, OAuthTokenResponseUnsupportedTokenType) {
+  std::string json = R"({
+    "access_token": "token123",
+    "token_type": "mac"
+  })";
+  auto result = ParseTokenResponse(json);
+  EXPECT_THAT(result, ::testing::Not(IsOk()));
+}
+
+// Verifies OAuthTokenResponse accepts N_A token type
+TEST_F(AuthManagerTest, OAuthTokenResponseNATokenType) {
+  std::string json = R"({
+    "access_token": "token123",
+    "token_type": "N_A"
+  })";
+  auto result = ParseTokenResponse(json);
+  ASSERT_THAT(result, IsOk());
+  EXPECT_EQ(result->token_type, "N_A");
+}
+
 }  // namespace iceberg::rest::auth


Reply via email to