This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a562834 feat(rest): implement create table (#417)
1a562834 is described below

commit 1a562834ace63fa5df08f516826d0b7d5dd15fe8
Author: Feiyang Li <[email protected]>
AuthorDate: Fri Dec 19 20:13:04 2025 +0800

    feat(rest): implement create table (#417)
---
 src/iceberg/catalog.h                           |   8 +-
 src/iceberg/catalog/memory/in_memory_catalog.cc |   6 +-
 src/iceberg/catalog/memory/in_memory_catalog.h  |   6 +-
 src/iceberg/catalog/rest/CMakeLists.txt         |   3 +-
 src/iceberg/catalog/rest/json_internal.cc       |  55 +++++
 src/iceberg/catalog/rest/json_internal.h        |   1 +
 src/iceberg/catalog/rest/meson.build            |   1 +
 src/iceberg/catalog/rest/rest_catalog.cc        |  51 ++++-
 src/iceberg/catalog/rest/rest_catalog.h         |  20 +-
 src/iceberg/catalog/rest/types.cc               |  72 +++++++
 src/iceberg/catalog/rest/types.h                |  27 ++-
 src/iceberg/json_internal.cc                    |  12 --
 src/iceberg/table_metadata.cc                   |   1 +
 src/iceberg/test/mock_catalog.h                 |   6 +-
 src/iceberg/test/rest_catalog_test.cc           |  67 +++++-
 src/iceberg/test/rest_json_internal_test.cc     | 276 ++++++++++++++++++++++++
 src/iceberg/test/std_io.h                       | 101 +++++++++
 17 files changed, 672 insertions(+), 41 deletions(-)

diff --git a/src/iceberg/catalog.h b/src/iceberg/catalog.h
index 08965df8..7be56bae 100644
--- a/src/iceberg/catalog.h
+++ b/src/iceberg/catalog.h
@@ -107,11 +107,13 @@ class ICEBERG_EXPORT Catalog {
   /// \param identifier a table identifier
   /// \param schema a schema
   /// \param spec a partition spec
+  /// \param order a sort order
   /// \param location a location for the table; leave empty if unspecified
   /// \param properties a string map of table properties
   /// \return a Table instance or ErrorKind::kAlreadyExists if the table 
already exists
   virtual Result<std::shared_ptr<Table>> CreateTable(
-      const TableIdentifier& identifier, const Schema& schema, const 
PartitionSpec& spec,
+      const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
+      const std::shared_ptr<PartitionSpec>& spec, const 
std::shared_ptr<SortOrder>& order,
       const std::string& location,
       const std::unordered_map<std::string, std::string>& properties) = 0;
 
@@ -131,12 +133,14 @@ class ICEBERG_EXPORT Catalog {
   /// \param identifier a table identifier
   /// \param schema a schema
   /// \param spec a partition spec
+  /// \param order a sort order
   /// \param location a location for the table; leave empty if unspecified
   /// \param properties a string map of table properties
   /// \return a Transaction to create the table or ErrorKind::kAlreadyExists 
if the
   /// table already exists
   virtual Result<std::shared_ptr<Transaction>> StageCreateTable(
-      const TableIdentifier& identifier, const Schema& schema, const 
PartitionSpec& spec,
+      const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
+      const std::shared_ptr<PartitionSpec>& spec, const 
std::shared_ptr<SortOrder>& order,
       const std::string& location,
       const std::unordered_map<std::string, std::string>& properties) = 0;
 
diff --git a/src/iceberg/catalog/memory/in_memory_catalog.cc 
b/src/iceberg/catalog/memory/in_memory_catalog.cc
index a0c143c5..b3fd0060 100644
--- a/src/iceberg/catalog/memory/in_memory_catalog.cc
+++ b/src/iceberg/catalog/memory/in_memory_catalog.cc
@@ -400,7 +400,8 @@ Result<std::vector<TableIdentifier>> 
InMemoryCatalog::ListTables(
 }
 
 Result<std::shared_ptr<Table>> InMemoryCatalog::CreateTable(
-    const TableIdentifier& identifier, const Schema& schema, const 
PartitionSpec& spec,
+    const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
+    const std::shared_ptr<PartitionSpec>& spec, const 
std::shared_ptr<SortOrder>& order,
     const std::string& location,
     const std::unordered_map<std::string, std::string>& properties) {
   std::unique_lock lock(mutex_);
@@ -439,7 +440,8 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::UpdateTable(
 }
 
 Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
-    const TableIdentifier& identifier, const Schema& schema, const 
PartitionSpec& spec,
+    const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
+    const std::shared_ptr<PartitionSpec>& spec, const 
std::shared_ptr<SortOrder>& order,
     const std::string& location,
     const std::unordered_map<std::string, std::string>& properties) {
   std::unique_lock lock(mutex_);
diff --git a/src/iceberg/catalog/memory/in_memory_catalog.h 
b/src/iceberg/catalog/memory/in_memory_catalog.h
index dd72dd89..22a596c1 100644
--- a/src/iceberg/catalog/memory/in_memory_catalog.h
+++ b/src/iceberg/catalog/memory/in_memory_catalog.h
@@ -71,7 +71,8 @@ class ICEBERG_EXPORT InMemoryCatalog
   Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const 
override;
 
   Result<std::shared_ptr<Table>> CreateTable(
-      const TableIdentifier& identifier, const Schema& schema, const 
PartitionSpec& spec,
+      const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
+      const std::shared_ptr<PartitionSpec>& spec, const 
std::shared_ptr<SortOrder>& order,
       const std::string& location,
       const std::unordered_map<std::string, std::string>& properties) override;
 
@@ -81,7 +82,8 @@ class ICEBERG_EXPORT InMemoryCatalog
       const std::vector<std::unique_ptr<TableUpdate>>& updates) override;
 
   Result<std::shared_ptr<Transaction>> StageCreateTable(
-      const TableIdentifier& identifier, const Schema& schema, const 
PartitionSpec& spec,
+      const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
+      const std::shared_ptr<PartitionSpec>& spec, const 
std::shared_ptr<SortOrder>& order,
       const std::string& location,
       const std::unordered_map<std::string, std::string>& properties) override;
 
diff --git a/src/iceberg/catalog/rest/CMakeLists.txt 
b/src/iceberg/catalog/rest/CMakeLists.txt
index 7b36298a..12d77e59 100644
--- a/src/iceberg/catalog/rest/CMakeLists.txt
+++ b/src/iceberg/catalog/rest/CMakeLists.txt
@@ -23,7 +23,8 @@ set(ICEBERG_REST_SOURCES
     json_internal.cc
     resource_paths.cc
     rest_catalog.cc
-    rest_util.cc)
+    rest_util.cc
+    types.cc)
 
 set(ICEBERG_REST_STATIC_BUILD_INTERFACE_LIBS)
 set(ICEBERG_REST_SHARED_BUILD_INTERFACE_LIBS)
diff --git a/src/iceberg/catalog/rest/json_internal.cc 
b/src/iceberg/catalog/rest/json_internal.cc
index 66e69025..b6bb970e 100644
--- a/src/iceberg/catalog/rest/json_internal.cc
+++ b/src/iceberg/catalog/rest/json_internal.cc
@@ -19,6 +19,7 @@
 
 #include "iceberg/catalog/rest/json_internal.h"
 
+#include <memory>
 #include <string>
 #include <utility>
 #include <vector>
@@ -27,6 +28,8 @@
 
 #include "iceberg/catalog/rest/types.h"
 #include "iceberg/json_internal.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/sort_order.h"
 #include "iceberg/table_identifier.h"
 #include "iceberg/util/json_util_internal.h"
 #include "iceberg/util/macros.h"
@@ -336,6 +339,57 @@ Result<ListTablesResponse> 
ListTablesResponseFromJson(const nlohmann::json& json
   return response;
 }
 
+nlohmann::json ToJson(const CreateTableRequest& request) {
+  nlohmann::json json;
+  json[kName] = request.name;
+  SetOptionalStringField(json, kLocation, request.location);
+  if (request.schema) {
+    json[kSchema] = ToJson(*request.schema);
+  }
+  if (request.partition_spec) {
+    json[kPartitionSpec] = ToJson(*request.partition_spec);
+  }
+  if (request.write_order) {
+    json[kWriteOrder] = ToJson(*request.write_order);
+  }
+  if (request.stage_create) {
+    json[kStageCreate] = request.stage_create;
+  }
+  SetContainerField(json, kProperties, request.properties);
+  return json;
+}
+
+Result<CreateTableRequest> CreateTableRequestFromJson(const nlohmann::json& 
json) {
+  CreateTableRequest request;
+  ICEBERG_ASSIGN_OR_RAISE(request.name, GetJsonValue<std::string>(json, 
kName));
+  ICEBERG_ASSIGN_OR_RAISE(request.location,
+                          GetJsonValueOrDefault<std::string>(json, kLocation));
+  ICEBERG_ASSIGN_OR_RAISE(auto schema, GetJsonValue<nlohmann::json>(json, 
kSchema));
+  ICEBERG_ASSIGN_OR_RAISE(request.schema, SchemaFromJson(schema));
+
+  if (json.contains(kPartitionSpec)) {
+    ICEBERG_ASSIGN_OR_RAISE(auto partition_spec,
+                            GetJsonValue<nlohmann::json>(json, 
kPartitionSpec));
+    ICEBERG_ASSIGN_OR_RAISE(request.partition_spec,
+                            PartitionSpecFromJson(request.schema, 
partition_spec,
+                                                  
PartitionSpec::kInitialSpecId));
+  }
+  if (json.contains(kWriteOrder)) {
+    ICEBERG_ASSIGN_OR_RAISE(auto sort_order,
+                            GetJsonValue<nlohmann::json>(json, kWriteOrder));
+    ICEBERG_ASSIGN_OR_RAISE(request.write_order,
+                            SortOrderFromJson(sort_order, request.schema));
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(request.stage_create,
+                          GetJsonValueOrDefault<bool>(json, kStageCreate, 
false));
+  ICEBERG_ASSIGN_OR_RAISE(
+      request.properties,
+      GetJsonValueOrDefault<decltype(request.properties)>(json, kProperties));
+  ICEBERG_RETURN_UNEXPECTED(request.Validate());
+  return request;
+}
+
 #define ICEBERG_DEFINE_FROM_JSON(Model)                       \
   template <>                                                 \
   Result<Model> FromJson<Model>(const nlohmann::json& json) { \
@@ -354,5 +408,6 @@ ICEBERG_DEFINE_FROM_JSON(ListTablesResponse)
 ICEBERG_DEFINE_FROM_JSON(LoadTableResult)
 ICEBERG_DEFINE_FROM_JSON(RegisterTableRequest)
 ICEBERG_DEFINE_FROM_JSON(RenameTableRequest)
+ICEBERG_DEFINE_FROM_JSON(CreateTableRequest)
 
 }  // namespace iceberg::rest
diff --git a/src/iceberg/catalog/rest/json_internal.h 
b/src/iceberg/catalog/rest/json_internal.h
index ba685922..e2a88b4c 100644
--- a/src/iceberg/catalog/rest/json_internal.h
+++ b/src/iceberg/catalog/rest/json_internal.h
@@ -55,6 +55,7 @@ ICEBERG_DECLARE_JSON_SERDE(ListTablesResponse)
 ICEBERG_DECLARE_JSON_SERDE(LoadTableResult)
 ICEBERG_DECLARE_JSON_SERDE(RegisterTableRequest)
 ICEBERG_DECLARE_JSON_SERDE(RenameTableRequest)
+ICEBERG_DECLARE_JSON_SERDE(CreateTableRequest)
 
 #undef ICEBERG_DECLARE_JSON_SERDE
 
diff --git a/src/iceberg/catalog/rest/meson.build 
b/src/iceberg/catalog/rest/meson.build
index cda90c61..a914c7e2 100644
--- a/src/iceberg/catalog/rest/meson.build
+++ b/src/iceberg/catalog/rest/meson.build
@@ -24,6 +24,7 @@ iceberg_rest_sources = files(
     'resource_paths.cc',
     'rest_catalog.cc',
     'rest_util.cc',
+    'types.cc',
 )
 # cpr does not export symbols, so on Windows it must
 # be used as a static lib
diff --git a/src/iceberg/catalog/rest/rest_catalog.cc 
b/src/iceberg/catalog/rest/rest_catalog.cc
index 0d14ea38..eeffffd2 100644
--- a/src/iceberg/catalog/rest/rest_catalog.cc
+++ b/src/iceberg/catalog/rest/rest_catalog.cc
@@ -40,6 +40,7 @@
 #include "iceberg/partition_spec.h"
 #include "iceberg/result.h"
 #include "iceberg/schema.h"
+#include "iceberg/sort_order.h"
 #include "iceberg/table.h"
 #include "iceberg/util/macros.h"
 
@@ -77,9 +78,12 @@ Result<CatalogConfig> FetchServerConfig(const ResourcePaths& 
paths,
 
 RestCatalog::~RestCatalog() = default;
 
-Result<std::unique_ptr<RestCatalog>> RestCatalog::Make(
-    const RestCatalogProperties& config) {
+Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
+    const RestCatalogProperties& config, std::shared_ptr<FileIO> file_io) {
   ICEBERG_ASSIGN_OR_RAISE(auto uri, config.Uri());
+  if (!file_io) {
+    return InvalidArgument("FileIO is required to create RestCatalog");
+  }
   ICEBERG_ASSIGN_OR_RAISE(
       auto paths, ResourcePaths::Make(std::string(TrimTrailingSlash(uri)),
                                       
config.Get(RestCatalogProperties::kPrefix)));
@@ -103,14 +107,17 @@ Result<std::unique_ptr<RestCatalog>> RestCatalog::Make(
   ICEBERG_ASSIGN_OR_RAISE(auto final_uri, final_config->Uri());
   
ICEBERG_RETURN_UNEXPECTED(paths->SetBaseUri(std::string(TrimTrailingSlash(final_uri))));
 
-  return std::unique_ptr<RestCatalog>(
-      new RestCatalog(std::move(final_config), std::move(paths), 
std::move(endpoints)));
+  return std::shared_ptr<RestCatalog>(
+      new RestCatalog(std::move(final_config), std::move(file_io), 
std::move(paths),
+                      std::move(endpoints)));
 }
 
 RestCatalog::RestCatalog(std::unique_ptr<RestCatalogProperties> config,
+                         std::shared_ptr<FileIO> file_io,
                          std::unique_ptr<ResourcePaths> paths,
                          std::unordered_set<Endpoint> endpoints)
     : config_(std::move(config)),
+      file_io_(std::move(file_io)),
       client_(std::make_unique<HttpClient>(config_->ExtractHeaders())),
       paths_(std::move(paths)),
       name_(config_->Get(RestCatalogProperties::kName)),
@@ -241,11 +248,33 @@ Result<std::vector<TableIdentifier>> 
RestCatalog::ListTables(
 }
 
 Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
-    [[maybe_unused]] const TableIdentifier& identifier,
-    [[maybe_unused]] const Schema& schema, [[maybe_unused]] const 
PartitionSpec& spec,
-    [[maybe_unused]] const std::string& location,
-    [[maybe_unused]] const std::unordered_map<std::string, std::string>& 
properties) {
-  return NotImplemented("Not implemented");
+    const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
+    const std::shared_ptr<PartitionSpec>& spec, const 
std::shared_ptr<SortOrder>& order,
+    const std::string& location,
+    const std::unordered_map<std::string, std::string>& properties) {
+  ICEBERG_RETURN_UNEXPECTED(CheckEndpoint(supported_endpoints_, 
Endpoint::CreateTable()));
+  ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(identifier.ns));
+
+  CreateTableRequest request{
+      .name = identifier.name,
+      .location = location,
+      .schema = schema,
+      .partition_spec = spec,
+      .write_order = order,
+      .stage_create = false,
+      .properties = properties,
+  };
+
+  ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
+  ICEBERG_ASSIGN_OR_RAISE(
+      const auto response,
+      client_->Post(path, json_request, /*headers=*/{}, 
*TableErrorHandler::Instance()));
+
+  ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
+  ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
+  return Table::Make(identifier, load_result.metadata,
+                     std::move(load_result.metadata_location), file_io_,
+                     shared_from_this());
 }
 
 Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
@@ -257,7 +286,9 @@ Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
 
 Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
     [[maybe_unused]] const TableIdentifier& identifier,
-    [[maybe_unused]] const Schema& schema, [[maybe_unused]] const 
PartitionSpec& spec,
+    [[maybe_unused]] const std::shared_ptr<Schema>& schema,
+    [[maybe_unused]] const std::shared_ptr<PartitionSpec>& spec,
+    [[maybe_unused]] const std::shared_ptr<SortOrder>& order,
     [[maybe_unused]] const std::string& location,
     [[maybe_unused]] const std::unordered_map<std::string, std::string>& 
properties) {
   return NotImplemented("Not implemented");
diff --git a/src/iceberg/catalog/rest/rest_catalog.h 
b/src/iceberg/catalog/rest/rest_catalog.h
index 26616827..a8096521 100644
--- a/src/iceberg/catalog/rest/rest_catalog.h
+++ b/src/iceberg/catalog/rest/rest_catalog.h
@@ -20,8 +20,8 @@
 #pragma once
 
 #include <memory>
-#include <set>
 #include <string>
+#include <unordered_set>
 
 #include "iceberg/catalog.h"
 #include "iceberg/catalog/rest/endpoint.h"
@@ -35,7 +35,8 @@
 namespace iceberg::rest {
 
 /// \brief Rest catalog implementation.
-class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
+class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
+                                        public 
std::enable_shared_from_this<RestCatalog> {
  public:
   ~RestCatalog() override;
 
@@ -47,8 +48,10 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
   /// \brief Create a RestCatalog instance
   ///
   /// \param config the configuration for the RestCatalog
-  /// \return a unique_ptr to RestCatalog instance
-  static Result<std::unique_ptr<RestCatalog>> Make(const 
RestCatalogProperties& config);
+  /// \param file_io the FileIO instance to use for table operations
+  /// \return a shared_ptr to RestCatalog instance
+  static Result<std::shared_ptr<RestCatalog>> Make(const 
RestCatalogProperties& config,
+                                                   std::shared_ptr<FileIO> 
file_io);
 
   std::string_view name() const override;
 
@@ -72,7 +75,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
   Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const 
override;
 
   Result<std::shared_ptr<Table>> CreateTable(
-      const TableIdentifier& identifier, const Schema& schema, const 
PartitionSpec& spec,
+      const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
+      const std::shared_ptr<PartitionSpec>& spec, const 
std::shared_ptr<SortOrder>& order,
       const std::string& location,
       const std::unordered_map<std::string, std::string>& properties) override;
 
@@ -82,7 +86,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
       const std::vector<std::unique_ptr<TableUpdate>>& updates) override;
 
   Result<std::shared_ptr<Transaction>> StageCreateTable(
-      const TableIdentifier& identifier, const Schema& schema, const 
PartitionSpec& spec,
+      const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
+      const std::shared_ptr<PartitionSpec>& spec, const 
std::shared_ptr<SortOrder>& order,
       const std::string& location,
       const std::unordered_map<std::string, std::string>& properties) override;
 
@@ -100,10 +105,11 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
 
  private:
   RestCatalog(std::unique_ptr<RestCatalogProperties> config,
-              std::unique_ptr<ResourcePaths> paths,
+              std::shared_ptr<FileIO> file_io, std::unique_ptr<ResourcePaths> 
paths,
               std::unordered_set<Endpoint> endpoints);
 
   std::unique_ptr<RestCatalogProperties> config_;
+  std::shared_ptr<FileIO> file_io_;
   std::unique_ptr<HttpClient> client_;
   std::unique_ptr<ResourcePaths> paths_;
   std::string name_;
diff --git a/src/iceberg/catalog/rest/types.cc 
b/src/iceberg/catalog/rest/types.cc
new file mode 100644
index 00000000..5c23e47b
--- /dev/null
+++ b/src/iceberg/catalog/rest/types.cc
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/catalog/rest/types.h"
+
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/sort_order.h"
+#include "iceberg/table_metadata.h"
+
+namespace iceberg::rest {
+
+bool CreateTableRequest::operator==(const CreateTableRequest& other) const {
+  if (name != other.name || location != other.location ||
+      stage_create != other.stage_create || properties != other.properties) {
+    return false;
+  }
+
+  if (!schema != !other.schema) {
+    return false;
+  }
+  if (schema && *schema != *other.schema) {
+    return false;
+  }
+
+  if (!partition_spec != !other.partition_spec) {
+    return false;
+  }
+  if (partition_spec && *partition_spec != *other.partition_spec) {
+    return false;
+  }
+
+  if (!write_order != !other.write_order) {
+    return false;
+  }
+  if (write_order && *write_order != *other.write_order) {
+    return false;
+  }
+  return true;
+}
+
+bool LoadTableResult::operator==(const LoadTableResult& other) const {
+  if (metadata_location != other.metadata_location || config != other.config) {
+    return false;
+  }
+
+  if (!metadata != !other.metadata) {
+    return false;
+  }
+  if (metadata && *metadata != *other.metadata) {
+    return false;
+  }
+  return true;
+}
+
+}  // namespace iceberg::rest
diff --git a/src/iceberg/catalog/rest/types.h b/src/iceberg/catalog/rest/types.h
index 867abc3d..01fe330d 100644
--- a/src/iceberg/catalog/rest/types.h
+++ b/src/iceberg/catalog/rest/types.h
@@ -27,6 +27,7 @@
 #include "iceberg/catalog/rest/endpoint.h"
 #include "iceberg/catalog/rest/iceberg_rest_export.h"
 #include "iceberg/result.h"
+#include "iceberg/schema.h"
 #include "iceberg/table_identifier.h"
 #include "iceberg/type_fwd.h"
 #include "iceberg/util/macros.h"
@@ -138,6 +139,30 @@ struct ICEBERG_REST_EXPORT RenameTableRequest {
   bool operator==(const RenameTableRequest&) const = default;
 };
 
+/// \brief Request to create a table.
+struct ICEBERG_REST_EXPORT CreateTableRequest {
+  std::string name;  // required
+  std::string location;
+  std::shared_ptr<Schema> schema;  // required
+  std::shared_ptr<PartitionSpec> partition_spec;
+  std::shared_ptr<SortOrder> write_order;
+  bool stage_create = false;
+  std::unordered_map<std::string, std::string> properties;
+
+  /// \brief Validates the CreateTableRequest.
+  Status Validate() const {
+    if (name.empty()) {
+      return Invalid("Missing table name");
+    }
+    if (!schema) {
+      return Invalid("Missing schema");
+    }
+    return {};
+  }
+
+  bool operator==(const CreateTableRequest& other) const;
+};
+
 /// \brief An opaque token that allows clients to make use of pagination for 
list APIs.
 using PageToken = std::string;
 
@@ -156,7 +181,7 @@ struct ICEBERG_REST_EXPORT LoadTableResult {
     return {};
   }
 
-  bool operator==(const LoadTableResult&) const = default;
+  bool operator==(const LoadTableResult& other) const;
 };
 
 /// \brief Alias of LoadTableResult used as the body of CreateTableResponse
diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc
index 7f25a174..82cb8ee7 100644
--- a/src/iceberg/json_internal.cc
+++ b/src/iceberg/json_internal.cc
@@ -220,18 +220,6 @@ Result<std::unique_ptr<SortOrder>> SortOrderFromJson(
   return SortOrder::Make(*current_schema, order_id, std::move(sort_fields));
 }
 
-Result<std::unique_ptr<SortOrder>> SortOrderFromJson(const nlohmann::json& 
json) {
-  ICEBERG_ASSIGN_OR_RAISE(auto order_id, GetJsonValue<int32_t>(json, 
kOrderId));
-  ICEBERG_ASSIGN_OR_RAISE(auto fields, GetJsonValue<nlohmann::json>(json, 
kFields));
-
-  std::vector<SortField> sort_fields;
-  for (const auto& field_json : fields) {
-    ICEBERG_ASSIGN_OR_RAISE(auto sort_field, SortFieldFromJson(field_json));
-    sort_fields.push_back(std::move(*sort_field));
-  }
-  return SortOrder::Make(order_id, std::move(sort_fields));
-}
-
 nlohmann::json ToJson(const SchemaField& field) {
   nlohmann::json json;
   json[kId] = field.field_id();
diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc
index 61bb8e08..e787b6fd 100644
--- a/src/iceberg/table_metadata.cc
+++ b/src/iceberg/table_metadata.cc
@@ -157,6 +157,7 @@ bool operator==(const TableMetadata& lhs, const 
TableMetadata& rhs) {
          lhs.last_column_id == rhs.last_column_id &&
          lhs.current_schema_id == rhs.current_schema_id &&
          SharedPtrVectorEquals(lhs.schemas, rhs.schemas) &&
+         SharedPtrVectorEquals(lhs.partition_specs, rhs.partition_specs) &&
          lhs.default_spec_id == rhs.default_spec_id &&
          lhs.last_partition_id == rhs.last_partition_id &&
          lhs.properties == rhs.properties &&
diff --git a/src/iceberg/test/mock_catalog.h b/src/iceberg/test/mock_catalog.h
index 1f43cfab..7873e6fe 100644
--- a/src/iceberg/test/mock_catalog.h
+++ b/src/iceberg/test/mock_catalog.h
@@ -56,7 +56,8 @@ class MockCatalog : public Catalog {
               (const, override));
 
   MOCK_METHOD((Result<std::shared_ptr<Table>>), CreateTable,
-              (const TableIdentifier&, const Schema&, const PartitionSpec&,
+              (const TableIdentifier&, const std::shared_ptr<Schema>&,
+               const std::shared_ptr<PartitionSpec>&, const 
std::shared_ptr<SortOrder>&,
                const std::string&, (const std::unordered_map<std::string, 
std::string>&)),
               (override));
 
@@ -67,7 +68,8 @@ class MockCatalog : public Catalog {
               (override));
 
   MOCK_METHOD((Result<std::shared_ptr<Transaction>>), StageCreateTable,
-              (const TableIdentifier&, const Schema&, const PartitionSpec&,
+              (const TableIdentifier&, const std::shared_ptr<Schema>&,
+               const std::shared_ptr<PartitionSpec>&, const 
std::shared_ptr<SortOrder>&,
                const std::string&, (const std::unordered_map<std::string, 
std::string>&)),
               (override));
 
diff --git a/src/iceberg/test/rest_catalog_test.cc 
b/src/iceberg/test/rest_catalog_test.cc
index 725ad7ec..fb1f7061 100644
--- a/src/iceberg/test/rest_catalog_test.cc
+++ b/src/iceberg/test/rest_catalog_test.cc
@@ -39,9 +39,14 @@
 #include "iceberg/catalog/rest/error_handlers.h"
 #include "iceberg/catalog/rest/http_client.h"
 #include "iceberg/catalog/rest/json_internal.h"
+#include "iceberg/partition_spec.h"
 #include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/sort_order.h"
+#include "iceberg/table.h"
 #include "iceberg/table_identifier.h"
 #include "iceberg/test/matchers.h"
+#include "iceberg/test/std_io.h"
 #include "iceberg/test/test_resource.h"
 #include "iceberg/test/util/docker_compose_util.h"
 
@@ -120,14 +125,15 @@ class RestCatalogIntegrationTest : public ::testing::Test 
{
   void TearDown() override {}
 
   // Helper function to create a REST catalog instance
-  Result<std::unique_ptr<RestCatalog>> CreateCatalog() {
+  Result<std::shared_ptr<RestCatalog>> CreateCatalog() {
     auto config = RestCatalogProperties::default_properties();
     config
         ->Set(RestCatalogProperties::kUri,
               std::format("{}:{}", kLocalhostUri, kRestCatalogPort))
         .Set(RestCatalogProperties::kName, std::string(kCatalogName))
         .Set(RestCatalogProperties::kWarehouse, std::string(kWarehouseName));
-    return RestCatalog::Make(*config);
+    auto file_io = std::make_shared<test::StdFileIO>();
+    return RestCatalog::Make(*config, std::make_shared<test::StdFileIO>());
   }
 
   static inline std::unique_ptr<DockerCompose> docker_compose_;
@@ -337,4 +343,61 @@ TEST_F(RestCatalogIntegrationTest, DropNamespace) {
   EXPECT_FALSE(*exists_result);
 }
 
+TEST_F(RestCatalogIntegrationTest, CreateTable) {
+  auto catalog_result = CreateCatalog();
+  ASSERT_THAT(catalog_result, IsOk());
+  auto& catalog = catalog_result.value();
+
+  // Create nested namespace with properties
+  Namespace ns{.levels = {"test_create_table", "apple", "ios"}};
+  std::unordered_map<std::string, std::string> ns_properties{{"owner", "ray"},
+                                                             {"community", 
"apache"}};
+
+  // Create parent namespaces first
+  auto status = catalog->CreateNamespace(Namespace{.levels = 
{"test_create_table"}}, {});
+  EXPECT_THAT(status, IsOk());
+  status =
+      catalog->CreateNamespace(Namespace{.levels = {"test_create_table", 
"apple"}}, {});
+  EXPECT_THAT(status, IsOk());
+  status = catalog->CreateNamespace(ns, ns_properties);
+  EXPECT_THAT(status, IsOk());
+
+  // Create schema
+  auto schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeOptional(1, "foo", string()),
+                               SchemaField::MakeRequired(2, "bar", int32()),
+                               SchemaField::MakeOptional(3, "baz", boolean())},
+      /*schema_id=*/1);
+
+  // Create partition spec and sort order (unpartitioned and unsorted)
+  auto partition_spec_result = 
PartitionSpec::Make(PartitionSpec::kInitialSpecId, {}, 0);
+  ASSERT_THAT(partition_spec_result, IsOk());
+  auto partition_spec = 
std::shared_ptr<PartitionSpec>(std::move(*partition_spec_result));
+
+  auto sort_order_result =
+      SortOrder::Make(SortOrder::kUnsortedOrderId, std::vector<SortField>{});
+  ASSERT_THAT(sort_order_result, IsOk());
+  auto sort_order = std::shared_ptr<SortOrder>(std::move(*sort_order_result));
+
+  // Create table
+  TableIdentifier table_id{.ns = ns, .name = "t1"};
+  std::unordered_map<std::string, std::string> table_properties;
+  auto table_result = catalog->CreateTable(table_id, schema, partition_spec, 
sort_order,
+                                           "", table_properties);
+  ASSERT_THAT(table_result, IsOk());
+  auto& table = table_result.value();
+
+  // Verify table
+  EXPECT_EQ(table->name().ns.levels,
+            (std::vector<std::string>{"test_create_table", "apple", "ios"}));
+  EXPECT_EQ(table->name().name, "t1");
+
+  // Verify that creating the same table again fails
+  auto duplicate_result = catalog->CreateTable(table_id, schema, 
partition_spec,
+                                               sort_order, "", 
table_properties);
+  EXPECT_THAT(duplicate_result, IsError(ErrorKind::kAlreadyExists));
+  EXPECT_THAT(duplicate_result,
+              HasErrorMessage("Table already exists: 
test_create_table.apple.ios.t1"));
+}
+
 }  // namespace iceberg::rest
diff --git a/src/iceberg/test/rest_json_internal_test.cc 
b/src/iceberg/test/rest_json_internal_test.cc
index 67350ebd..c4b7fb55 100644
--- a/src/iceberg/test/rest_json_internal_test.cc
+++ b/src/iceberg/test/rest_json_internal_test.cc
@@ -25,12 +25,53 @@
 
 #include "iceberg/catalog/rest/json_internal.h"
 #include "iceberg/catalog/rest/types.h"
+#include "iceberg/partition_spec.h"
 #include "iceberg/result.h"
+#include "iceberg/sort_order.h"
 #include "iceberg/table_identifier.h"
+#include "iceberg/table_metadata.h"
 #include "iceberg/test/matchers.h"
 
 namespace iceberg::rest {
 
+// Helper function to create a simple schema for testing
+static std::shared_ptr<Schema> MakeSimpleSchema() {
+  return std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField(1, "id", int32(), false),     // 
required
+                               SchemaField(2, "data", string(), true)},  // 
optional
+      std::nullopt);
+}
+
+// Helper function to create a simple TableMetadata for testing
+static std::shared_ptr<TableMetadata> MakeSimpleTableMetadata() {
+  auto schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField(1, "id", int32(), false)}, 1);
+  return std::make_shared<TableMetadata>(TableMetadata{
+      .format_version = 2,
+      .table_uuid = "test-uuid-1234",
+      .location = "s3://bucket/test",
+      .last_sequence_number = 0,
+      .last_updated_ms = TimePointMs{},
+      .last_column_id = 1,
+      .schemas = {schema},
+      .current_schema_id = 1,
+      .partition_specs = {PartitionSpec::Unpartitioned()},
+      .default_spec_id = 0,
+      .last_partition_id = 0,
+      .properties = {},
+      .current_snapshot_id = -1,
+      .snapshots = {},
+      .snapshot_log = {},
+      .metadata_log = {},
+      .sort_orders = {SortOrder::Unsorted()},
+      .default_sort_order_id = 0,
+      .refs = {},
+      .statistics = {},
+      .partition_statistics = {},
+      .next_row_id = 0,
+  });
+}
+
 // Test parameter structure for roundtrip tests
 template <typename Model>
 struct JsonRoundTripParam {
@@ -906,4 +947,239 @@ INSTANTIATE_TEST_SUITE_P(
       return info.param.test_name;
     });
 
+DECLARE_ROUNDTRIP_TEST(CreateTableRequest)
+
+INSTANTIATE_TEST_SUITE_P(
+    CreateTableRequestCases, CreateTableRequestTest,
+    ::testing::Values(
+        // Minimal request with only required fields (name and schema)
+        CreateTableRequestParam{
+            .test_name = "MinimalRequest",
+            .expected_json_str =
+                
R"({"name":"my_table","schema":{"type":"struct","fields":[{"id":1,"name":"id","type":"int","required":true},{"id":2,"name":"data","type":"string","required":false}]}})",
+            .model = {.name = "my_table", .schema = MakeSimpleSchema()}},
+        // Request with location
+        CreateTableRequestParam{
+            .test_name = "WithLocation",
+            .expected_json_str =
+                
R"({"name":"my_table","schema":{"type":"struct","fields":[{"id":1,"name":"id","type":"int","required":true},{"id":2,"name":"data","type":"string","required":false}]},"location":"s3://bucket/warehouse/my_table"})",
+            .model = {.name = "my_table",
+                      .location = "s3://bucket/warehouse/my_table",
+                      .schema = MakeSimpleSchema()}},
+        // Request with properties
+        CreateTableRequestParam{
+            .test_name = "WithProperties",
+            .expected_json_str =
+                
R"({"name":"my_table","schema":{"type":"struct","fields":[{"id":1,"name":"id","type":"int","required":true},{"id":2,"name":"data","type":"string","required":false}]},"properties":{"owner":"alice","version":"1.0"}})",
+            .model = {.name = "my_table",
+                      .schema = MakeSimpleSchema(),
+                      .properties = {{"owner", "alice"}, {"version", "1.0"}}}},
+        // Request with stage_create = true
+        CreateTableRequestParam{
+            .test_name = "WithStageCreate",
+            .expected_json_str =
+                
R"({"name":"my_table","schema":{"type":"struct","fields":[{"id":1,"name":"id","type":"int","required":true},{"id":2,"name":"data","type":"string","required":false}]},"stage-create":true})",
+            .model = {.name = "my_table",
+                      .schema = MakeSimpleSchema(),
+                      .stage_create = true}},
+        // Request with partition_spec (unpartitioned)
+        CreateTableRequestParam{
+            .test_name = "WithUnpartitionedSpec",
+            .expected_json_str =
+                
R"({"name":"my_table","schema":{"type":"struct","fields":[{"id":1,"name":"id","type":"int","required":true},{"id":2,"name":"data","type":"string","required":false}]},"partition-spec":{"spec-id":0,"fields":[]}})",
+            .model = {.name = "my_table",
+                      .schema = MakeSimpleSchema(),
+                      .partition_spec = PartitionSpec::Unpartitioned()}},
+        // Request with write_order (unsorted)
+        CreateTableRequestParam{
+            .test_name = "WithUnsortedOrder",
+            .expected_json_str =
+                
R"({"name":"my_table","schema":{"type":"struct","fields":[{"id":1,"name":"id","type":"int","required":true},{"id":2,"name":"data","type":"string","required":false}]},"write-order":{"order-id":0,"fields":[]}})",
+            .model = {.name = "my_table",
+                      .schema = MakeSimpleSchema(),
+                      .write_order = SortOrder::Unsorted()}}),
+    [](const ::testing::TestParamInfo<CreateTableRequestParam>& info) {
+      return info.param.test_name;
+    });
+
+DECLARE_DESERIALIZE_TEST(CreateTableRequest)
+
+INSTANTIATE_TEST_SUITE_P(
+    CreateTableRequestDeserializeCases, CreateTableRequestDeserializeTest,
+    ::testing::Values(
+        // Location field is missing (should deserialize to empty string)
+        CreateTableRequestDeserializeParam{
+            .test_name = "MissingLocation",
+            .json_str =
+                
R"({"name":"my_table","schema":{"type":"struct","fields":[{"id":1,"name":"id","type":"int","required":true}]}})",
+            .expected_model = {.name = "my_table",
+                               .schema = std::make_shared<Schema>(
+                                   std::vector<SchemaField>{
+                                       SchemaField(1, "id", int32(), false)},  
// required
+                                   std::nullopt)}},
+        // stage-create field is missing (should default to false)
+        CreateTableRequestDeserializeParam{
+            .test_name = "MissingStageCreate",
+            .json_str =
+                
R"({"name":"my_table","schema":{"type":"struct","fields":[{"id":1,"name":"id","type":"int","required":true}]}})",
+            .expected_model = {.name = "my_table",
+                               .schema = std::make_shared<Schema>(
+                                   std::vector<SchemaField>{
+                                       SchemaField(1, "id", int32(), false)},  
// required
+                                   std::nullopt),
+                               .stage_create = false}},
+        // Properties field is missing (should deserialize to empty map)
+        CreateTableRequestDeserializeParam{
+            .test_name = "MissingProperties",
+            .json_str =
+                
R"({"name":"my_table","schema":{"type":"struct","fields":[{"id":1,"name":"id","type":"int","required":true}]}})",
+            .expected_model = {.name = "my_table",
+                               .schema = std::make_shared<Schema>(
+                                   std::vector<SchemaField>{
+                                       SchemaField(1, "id", int32(), false)},  
// required
+                                   std::nullopt)}}),
+    [](const ::testing::TestParamInfo<CreateTableRequestDeserializeParam>& 
info) {
+      return info.param.test_name;
+    });
+
+DECLARE_INVALID_TEST(CreateTableRequest)
+
+INSTANTIATE_TEST_SUITE_P(
+    CreateTableRequestInvalidCases, CreateTableRequestInvalidTest,
+    ::testing::Values(
+        // Missing required name field
+        CreateTableRequestInvalidParam{
+            .test_name = "MissingName",
+            .invalid_json_str =
+                
R"({"schema":{"type":"struct","fields":[{"id":1,"name":"id","type":"int","required":true}]}})",
+            .expected_error_message = "Missing 'name'"},
+        // Missing required schema field
+        CreateTableRequestInvalidParam{.test_name = "MissingSchema",
+                                       .invalid_json_str = 
R"({"name":"my_table"})",
+                                       .expected_error_message = "Missing 
'schema'"},
+        // Empty JSON object
+        CreateTableRequestInvalidParam{.test_name = "EmptyJson",
+                                       .invalid_json_str = R"({})",
+                                       .expected_error_message = "Missing 
'name'"},
+        // Wrong type for name field
+        CreateTableRequestInvalidParam{
+            .test_name = "WrongNameType",
+            .invalid_json_str =
+                
R"({"name":123,"schema":{"type":"struct","fields":[{"id":1,"name":"id","type":"int","required":true}]}})",
+            .expected_error_message = "type must be string, but is number"},
+        // Wrong type for schema field
+        CreateTableRequestInvalidParam{
+            .test_name = "WrongSchemaType",
+            .invalid_json_str = R"({"name":"my_table","schema":"invalid"})",
+            .expected_error_message = "Unknown primitive type: invalid"}),
+    [](const ::testing::TestParamInfo<CreateTableRequestInvalidParam>& info) {
+      return info.param.test_name;
+    });
+
+DECLARE_ROUNDTRIP_TEST(LoadTableResult)
+
+INSTANTIATE_TEST_SUITE_P(
+    LoadTableResultCases, LoadTableResultTest,
+    ::testing::Values(
+        // Minimal case - only required metadata field
+        LoadTableResultParam{
+            .test_name = "MinimalMetadata",
+            .expected_json_str =
+                
R"({"metadata":{"current-schema-id":1,"current-snapshot-id":null,"default-sort-order-id":0,"default-spec-id":0,"format-version":2,"last-column-id":1,"last-partition-id":0,"last-sequence-number":0,"last-updated-ms":0,"location":"s3://bucket/test","metadata-log":[],"partition-specs":[{"fields":[],"spec-id":0}],"partition-statistics":[],"properties":{},"refs":{},"schemas":[{"fields":[{"id":1,"name":"id","required":true,"type":"int"}],"schema-id":1,"type":"struct"}],"snapshot
 [...]
+            .model = {.metadata = MakeSimpleTableMetadata()}},
+        // With metadata location
+        LoadTableResultParam{
+            .test_name = "WithMetadataLocation",
+            .expected_json_str =
+                
R"({"metadata":{"current-schema-id":1,"current-snapshot-id":null,"default-sort-order-id":0,"default-spec-id":0,"format-version":2,"last-column-id":1,"last-partition-id":0,"last-sequence-number":0,"last-updated-ms":0,"location":"s3://bucket/test","metadata-log":[],"partition-specs":[{"fields":[],"spec-id":0}],"partition-statistics":[],"properties":{},"refs":{},"schemas":[{"fields":[{"id":1,"name":"id","required":true,"type":"int"}],"schema-id":1,"type":"struct"}],"snapshot
 [...]
+            .model = {.metadata_location = "s3://bucket/metadata/v1.json",
+                      .metadata = MakeSimpleTableMetadata()}},
+        // With config
+        LoadTableResultParam{
+            .test_name = "WithConfig",
+            .expected_json_str =
+                
R"({"config":{"warehouse":"s3://bucket/warehouse"},"metadata":{"current-schema-id":1,"current-snapshot-id":null,"default-sort-order-id":0,"default-spec-id":0,"format-version":2,"last-column-id":1,"last-partition-id":0,"last-sequence-number":0,"last-updated-ms":0,"location":"s3://bucket/test","metadata-log":[],"partition-specs":[{"fields":[],"spec-id":0}],"partition-statistics":[],"properties":{},"refs":{},"schemas":[{"fields":[{"id":1,"name":"id","required":true,"type":"i
 [...]
+            .model = {.metadata = MakeSimpleTableMetadata(),
+                      .config = {{"warehouse", "s3://bucket/warehouse"}}}},
+        // With both metadata location and config
+        LoadTableResultParam{
+            .test_name = "WithMetadataLocationAndConfig",
+            .expected_json_str =
+                
R"({"config":{"foo":"bar","warehouse":"s3://bucket/warehouse"},"metadata":{"current-schema-id":1,"current-snapshot-id":null,"default-sort-order-id":0,"default-spec-id":0,"format-version":2,"last-column-id":1,"last-partition-id":0,"last-sequence-number":0,"last-updated-ms":0,"location":"s3://bucket/test","metadata-log":[],"partition-specs":[{"fields":[],"spec-id":0}],"partition-statistics":[],"properties":{},"refs":{},"schemas":[{"fields":[{"id":1,"name":"id","required":tr
 [...]
+            .model = {.metadata_location = "s3://bucket/metadata/v1.json",
+                      .metadata = MakeSimpleTableMetadata(),
+                      .config = {{"warehouse", "s3://bucket/warehouse"},
+                                 {"foo", "bar"}}}}),
+    [](const ::testing::TestParamInfo<LoadTableResultParam>& info) {
+      return info.param.test_name;
+    });
+
+DECLARE_DESERIALIZE_TEST(LoadTableResult)
+
+INSTANTIATE_TEST_SUITE_P(
+    LoadTableResultDeserializeCases, LoadTableResultDeserializeTest,
+    ::testing::Values(
+        // Minimal metadata - tests basic deserialization
+        LoadTableResultDeserializeParam{
+            .test_name = "MinimalMetadata",
+            .json_str =
+                
R"({"metadata":{"format-version":2,"table-uuid":"test-uuid-1234","location":"s3://bucket/test","last-sequence-number":0,"last-updated-ms":0,"last-column-id":1,"schemas":[{"type":"struct","schema-id":1,"fields":[{"id":1,"name":"id","type":"int","required":true}]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":0,"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"properties":{}}})",
+            .expected_model = {.metadata = MakeSimpleTableMetadata()}},
+        // With metadata location
+        LoadTableResultDeserializeParam{
+            .test_name = "WithMetadataLocation",
+            .json_str =
+                
R"({"metadata-location":"s3://bucket/metadata/v1.json","metadata":{"format-version":2,"table-uuid":"test-uuid-1234","location":"s3://bucket/test","last-sequence-number":0,"last-updated-ms":0,"last-column-id":1,"schemas":[{"type":"struct","schema-id":1,"fields":[{"id":1,"name":"id","type":"int","required":true}]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":0,"sort-orders":[{"order-id":0,"fields":[]}],"default
 [...]
+            .expected_model = {.metadata_location = 
"s3://bucket/metadata/v1.json",
+                               .metadata = MakeSimpleTableMetadata()}},
+        // With config
+        LoadTableResultDeserializeParam{
+            .test_name = "WithConfig",
+            .json_str =
+                
R"({"metadata":{"format-version":2,"table-uuid":"test-uuid-1234","location":"s3://bucket/test","last-sequence-number":0,"last-updated-ms":0,"last-column-id":1,"schemas":[{"type":"struct","schema-id":1,"fields":[{"id":1,"name":"id","type":"int","required":true}]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":0,"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"properties":{}},"config":{"wareh
 [...]
+            .expected_model = {.metadata = MakeSimpleTableMetadata(),
+                               .config = {{"warehouse", 
"s3://bucket/warehouse"}}}}),
+    [](const ::testing::TestParamInfo<LoadTableResultDeserializeParam>& info) {
+      return info.param.test_name;
+    });
+
+DECLARE_INVALID_TEST(LoadTableResult)
+
+INSTANTIATE_TEST_SUITE_P(
+    LoadTableResultInvalidCases, LoadTableResultInvalidTest,
+    ::testing::Values(
+        // Missing required metadata field
+        LoadTableResultInvalidParam{.test_name = "MissingMetadata",
+                                    .invalid_json_str = R"({})",
+                                    .expected_error_message = "Missing 
'metadata'"},
+        // Null metadata field
+        LoadTableResultInvalidParam{.test_name = "NullMetadata",
+                                    .invalid_json_str = R"({"metadata":null})",
+                                    .expected_error_message = "Missing 
'metadata'"},
+        // Wrong type for metadata field
+        LoadTableResultInvalidParam{
+            .test_name = "WrongMetadataType",
+            .invalid_json_str = R"({"metadata":"invalid"})",
+            .expected_error_message = "Cannot parse metadata from a 
non-object"},
+        // Wrong type for metadata-location field
+        LoadTableResultInvalidParam{
+            .test_name = "WrongMetadataLocationType",
+            .invalid_json_str =
+                
R"({"metadata-location":123,"metadata":{"format-version":2,"table-uuid":"test","location":"s3://test","last-sequence-number":0,"schemas":[{"type":"struct","schema-id":1,"fields":[{"id":1,"name":"id","type":"int","required":true}]}],"current-schema-id":1,"default-spec-id":0,"last-partition-id":0,"default-sort-order-id":0}})",
+            .expected_error_message = "type must be string, but is number"},
+        // Wrong type for config field
+        LoadTableResultInvalidParam{
+            .test_name = "WrongConfigType",
+            .invalid_json_str =
+                
R"({"config":"invalid","metadata":{"format-version":2,"table-uuid":"test","location":"s3://test","last-sequence-number":0,"last-column-id":1,"last-updated-ms":0,"schemas":[{"type":"struct","schema-id":1,"fields":[{"id":1,"name":"id","type":"int","required":true}]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":0,"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0}})",
+            .expected_error_message = "type must be object, but is string"},
+        // Invalid metadata content
+        LoadTableResultInvalidParam{
+            .test_name = "InvalidMetadataContent",
+            .invalid_json_str = R"({"metadata":{"format-version":"invalid"}})",
+            .expected_error_message = "type must be number, but is string"}),
+    [](const ::testing::TestParamInfo<LoadTableResultInvalidParam>& info) {
+      return info.param.test_name;
+    });
+
 }  // namespace iceberg::rest
diff --git a/src/iceberg/test/std_io.h b/src/iceberg/test/std_io.h
new file mode 100644
index 00000000..3b58267d
--- /dev/null
+++ b/src/iceberg/test/std_io.h
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <filesystem>
+#include <fstream>
+#include <optional>
+#include <sstream>
+#include <string>
+#include <string_view>
+
+#include "iceberg/file_io.h"
+#include "iceberg/result.h"
+
+namespace iceberg::test {
+
+/// \brief Simple local filesystem FileIO implementation for testing
+///
+/// This class provides a basic FileIO implementation that reads and writes
+/// files to the local filesystem using standard C++ file streams.
+class StdFileIO : public FileIO {
+ public:
+  Result<std::string> ReadFile(const std::string& file_location,
+                               std::optional<size_t> length) override {
+    std::ifstream file(file_location, std::ios::binary);
+    if (!file.is_open()) {
+      return IOError("Failed to open file for reading: {}", file_location);
+    }
+
+    if (length.has_value()) {
+      std::string content(length.value(), '\0');
+      file.read(content.data(), length.value());
+      if (!file) {
+        return IOError("Failed to read {} bytes from file: {}", length.value(),
+                       file_location);
+      }
+      return content;
+    } else {
+      std::stringstream buffer;
+      buffer << file.rdbuf();
+      if (!file && !file.eof()) {
+        return IOError("Failed to read file: {}", file_location);
+      }
+      return buffer.str();
+    }
+  }
+
+  Status WriteFile(const std::string& file_location, std::string_view content) 
override {
+    // Create parent directories if they don't exist
+    std::filesystem::path path(file_location);
+    if (path.has_parent_path()) {
+      std::error_code ec;
+      std::filesystem::create_directories(path.parent_path(), ec);
+      if (ec) {
+        return IOError("Failed to create parent directories for: {}", 
file_location);
+      }
+    }
+
+    std::ofstream file(file_location, std::ios::binary);
+    if (!file.is_open()) {
+      return IOError("Failed to open file for writing: {}", file_location);
+    }
+
+    file.write(content.data(), content.size());
+    if (!file) {
+      return IOError("Failed to write to file: {}", file_location);
+    }
+
+    return {};
+  }
+
+  Status DeleteFile(const std::string& file_location) override {
+    std::error_code ec;
+    if (!std::filesystem::remove(file_location, ec)) {
+      if (ec) {
+        return IOError("Failed to delete file: {}", file_location);
+      }
+      return IOError("File does not exist: {}", file_location);
+    }
+    return {};
+  }
+};
+
+}  // namespace iceberg::test

Reply via email to