This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 3d36a9eb feat(rest): implement stage-create table (#485)
3d36a9eb is described below
commit 3d36a9eb88f91f14597928fb3b27d0e5f9465ae8
Author: Feiyang Li <[email protected]>
AuthorDate: Tue Jan 6 18:01:12 2026 +0800
feat(rest): implement stage-create table (#485)
---
src/iceberg/catalog/rest/rest_catalog.cc | 47 ++++++++++++++++++++-----------
src/iceberg/catalog/rest/rest_catalog.h | 6 ++++
src/iceberg/catalog/rest/type_fwd.h | 1 +
src/iceberg/test/rest_catalog_test.cc | 48 ++++++++++++++++++++++++++++++++
4 files changed, 85 insertions(+), 17 deletions(-)
diff --git a/src/iceberg/catalog/rest/rest_catalog.cc
b/src/iceberg/catalog/rest/rest_catalog.cc
index a799d69a..5ab6e591 100644
--- a/src/iceberg/catalog/rest/rest_catalog.cc
+++ b/src/iceberg/catalog/rest/rest_catalog.cc
@@ -44,6 +44,7 @@
#include "iceberg/table.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_update.h"
+#include "iceberg/transaction.h"
#include "iceberg/util/macros.h"
namespace iceberg::rest {
@@ -274,11 +275,11 @@ Result<std::vector<TableIdentifier>>
RestCatalog::ListTables(const Namespace& ns
return result;
}
-Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
+Result<LoadTableResult> RestCatalog::CreateTableInternal(
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
const std::shared_ptr<PartitionSpec>& spec, const
std::shared_ptr<SortOrder>& order,
const std::string& location,
- const std::unordered_map<std::string, std::string>& properties) {
+ const std::unordered_map<std::string, std::string>& properties, bool
stage_create) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(identifier.ns));
@@ -288,7 +289,7 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
.schema = schema,
.partition_spec = spec,
.write_order = order,
- .stage_create = false,
+ .stage_create = stage_create,
.properties = properties,
};
@@ -298,10 +299,19 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
client_->Post(path, json_request, /*headers=*/{},
*TableErrorHandler::Instance()));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
- ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
- return Table::Make(identifier, load_result.metadata,
- std::move(load_result.metadata_location), file_io_,
- shared_from_this());
+ return LoadTableResultFromJson(json);
+}
+
+Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
+ const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
+ const std::shared_ptr<PartitionSpec>& spec, const
std::shared_ptr<SortOrder>& order,
+ const std::string& location,
+ const std::unordered_map<std::string, std::string>& properties) {
+ ICEBERG_ASSIGN_OR_RAISE(auto result,
+ CreateTableInternal(identifier, schema, spec, order,
location,
+ properties,
/*stage_create=*/false));
+ return Table::Make(identifier, std::move(result.metadata),
+ std::move(result.metadata_location), file_io_,
shared_from_this());
}
Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
@@ -335,13 +345,19 @@ Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
}
Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
- [[maybe_unused]] const TableIdentifier& identifier,
- [[maybe_unused]] const std::shared_ptr<Schema>& schema,
- [[maybe_unused]] const std::shared_ptr<PartitionSpec>& spec,
- [[maybe_unused]] const std::shared_ptr<SortOrder>& order,
- [[maybe_unused]] const std::string& location,
- [[maybe_unused]] const std::unordered_map<std::string, std::string>&
properties) {
- return NotImplemented("Not implemented");
+ const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
+ const std::shared_ptr<PartitionSpec>& spec, const
std::shared_ptr<SortOrder>& order,
+ const std::string& location,
+ const std::unordered_map<std::string, std::string>& properties) {
+ ICEBERG_ASSIGN_OR_RAISE(auto result,
+ CreateTableInternal(identifier, schema, spec, order,
location,
+ properties,
/*stage_create=*/true));
+ ICEBERG_ASSIGN_OR_RAISE(auto staged_table,
+ StagedTable::Make(identifier,
std::move(result.metadata),
+
std::move(result.metadata_location), file_io_,
+ shared_from_this()));
+ return Transaction::Make(std::move(staged_table), Transaction::Kind::kCreate,
+ /*auto_commit=*/false);
}
Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
@@ -393,9 +409,6 @@ Result<std::string> RestCatalog::LoadTableInternal(
}
Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier&
identifier) {
- ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::LoadTable());
- ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
-
ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body));
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
diff --git a/src/iceberg/catalog/rest/rest_catalog.h
b/src/iceberg/catalog/rest/rest_catalog.h
index 41928cf7..721df29d 100644
--- a/src/iceberg/catalog/rest/rest_catalog.h
+++ b/src/iceberg/catalog/rest/rest_catalog.h
@@ -110,6 +110,12 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
Result<std::string> LoadTableInternal(const TableIdentifier& identifier)
const;
+ Result<LoadTableResult> CreateTableInternal(
+ const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
+ const std::shared_ptr<PartitionSpec>& spec, const
std::shared_ptr<SortOrder>& order,
+ const std::string& location,
+ const std::unordered_map<std::string, std::string>& properties, bool
stage_create);
+
std::unique_ptr<RestCatalogProperties> config_;
std::shared_ptr<FileIO> file_io_;
std::unique_ptr<HttpClient> client_;
diff --git a/src/iceberg/catalog/rest/type_fwd.h
b/src/iceberg/catalog/rest/type_fwd.h
index e7fddb91..9a57c11b 100644
--- a/src/iceberg/catalog/rest/type_fwd.h
+++ b/src/iceberg/catalog/rest/type_fwd.h
@@ -25,6 +25,7 @@
namespace iceberg::rest {
struct ErrorResponse;
+struct LoadTableResult;
class Endpoint;
class ErrorHandler;
diff --git a/src/iceberg/test/rest_catalog_test.cc
b/src/iceberg/test/rest_catalog_test.cc
index 7f04de0a..f17b9391 100644
--- a/src/iceberg/test/rest_catalog_test.cc
+++ b/src/iceberg/test/rest_catalog_test.cc
@@ -52,6 +52,7 @@
#include "iceberg/test/std_io.h"
#include "iceberg/test/test_resource.h"
#include "iceberg/test/util/docker_compose_util.h"
+#include "iceberg/transaction.h"
namespace iceberg::rest {
@@ -639,4 +640,51 @@ TEST_F(RestCatalogIntegrationTest, RegisterTable) {
EXPECT_NE(table->name(), registered_table->name());
}
+TEST_F(RestCatalogIntegrationTest, StageCreateTable) {
+ auto catalog_result = CreateCatalog();
+ ASSERT_THAT(catalog_result, IsOk());
+ auto& catalog = catalog_result.value();
+
+ // Create namespace
+ Namespace ns{.levels = {"test_stage_create"}};
+ auto status = catalog->CreateNamespace(ns, {});
+ EXPECT_THAT(status, IsOk());
+
+ // Stage create table
+ auto schema = CreateDefaultSchema();
+ auto partition_spec = PartitionSpec::Unpartitioned();
+ auto sort_order = SortOrder::Unsorted();
+
+ TableIdentifier table_id{.ns = ns, .name = "staged_table"};
+ std::unordered_map<std::string, std::string> table_properties{{"key1",
"value1"}};
+ auto txn_result = catalog->StageCreateTable(table_id, schema, partition_spec,
+ sort_order, "",
table_properties);
+ ASSERT_THAT(txn_result, IsOk());
+ auto& txn = txn_result.value();
+
+ // Verify the staged table in transaction
+ EXPECT_NE(txn->table(), nullptr);
+ EXPECT_EQ(txn->table()->name(), table_id);
+
+ // Table should NOT exist in catalog yet (staged but not committed)
+ auto exists_result = catalog->TableExists(table_id);
+ ASSERT_THAT(exists_result, IsOk());
+ EXPECT_FALSE(exists_result.value());
+
+ // Commit the transaction
+ auto commit_result = txn->Commit();
+ ASSERT_THAT(commit_result, IsOk());
+ auto& committed_table = commit_result.value();
+
+ // Verify table now exists
+ exists_result = catalog->TableExists(table_id);
+ ASSERT_THAT(exists_result, IsOk());
+ EXPECT_TRUE(exists_result.value());
+
+ // Verify table properties
+ EXPECT_EQ(committed_table->name(), table_id);
+ auto& props = committed_table->metadata()->properties.configs();
+ EXPECT_EQ(props.at("key1"), "value1");
+}
+
} // namespace iceberg::rest