wgtmac commented on code in PR #416:
URL: https://github.com/apache/iceberg-cpp/pull/416#discussion_r2646701592
##########
src/iceberg/table_metadata.h:
##########
@@ -124,6 +124,12 @@ struct ICEBERG_EXPORT TableMetadata {
/// A `long` higher than all assigned row IDs
int64_t next_row_id;
+ static Result<std::unique_ptr<TableMetadata>> Make(
+ const iceberg::Schema& schema, const iceberg::PartitionSpec& spec,
Review Comment:
Do you think we should rename functions below to `GetSchema`, `GetSnapshot`,
etc. so we don't need to write `iceberg::` prefix here and elsewhere?
##########
src/iceberg/table_metadata.cc:
##########
@@ -65,6 +68,106 @@ std::string ToString(const MetadataLogEntry& entry) {
entry.metadata_file);
}
+Result<std::shared_ptr<PartitionSpec>> FreshPartitionSpec(int32_t spec_id,
+ const Schema&
fresh_schema,
+ const Schema&
base_schema,
+ const PartitionSpec&
spec) {
+ int32_t last_assigned_field_id = -1;
+ std::vector<PartitionField> partition_fields;
+ for (auto& field : spec.fields()) {
+ ICEBERG_ASSIGN_OR_RAISE(auto source_name,
+ base_schema.FindColumnNameById(field.field_id()));
+ if (!source_name.has_value()) [[unlikely]] {
+ return InvalidSchema("Partition field id {} does not exist in the
schema",
+ field.field_id());
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto fresh_field,
+ fresh_schema.FindFieldByName(source_name.value()));
+ if (!fresh_field.has_value()) [[unlikely]] {
+ return InvalidSchema("Partition field {} does not exist in the schema",
+ source_name.value());
+ }
+ partition_fields.emplace_back(
+ fresh_field.value().get().field_id(), ++last_assigned_field_id,
+ std::string(fresh_field.value().get().name()), field.transform());
+ }
+ return PartitionSpec::Make(fresh_schema, spec_id,
std::move(partition_fields), false,
+ last_assigned_field_id);
+}
+
+Result<std::shared_ptr<SortOrder>> FreshSortOrder(int32_t order_id, const
Schema& schema,
+ const SortOrder& order) {
+ if (order.is_unsorted()) {
+ return SortOrder::Unsorted();
+ }
+
+ std::vector<SortField> fresh_fields;
+ for (const auto& field : order.fields()) {
+ ICEBERG_ASSIGN_OR_RAISE(auto source_name,
+ schema.FindColumnNameById(field.source_id()));
+ if (!source_name.has_value()) {
+ return InvalidSchema("Unable to find source field with ID {} in the old
schema",
+ field.source_id());
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto fresh_field,
+ schema.FindFieldByName(source_name.value()));
+ if (!fresh_field.has_value()) {
+ return InvalidSchema("Unable to find field '{}' in the new schema",
+ source_name.value());
+ }
+
+ int32_t new_source_id = fresh_field.value().get().field_id();
+ fresh_fields.emplace_back(new_source_id, field.transform(),
field.direction(),
+ field.null_order());
+ }
+
+ return SortOrder::Make(order_id, std::move(fresh_fields));
+}
+
+Result<std::unique_ptr<TableMetadata>> TableMetadata::Make(
+ const iceberg::Schema& schema, const iceberg::PartitionSpec& spec,
+ const iceberg::SortOrder& sort_order, const std::string& location,
+ const std::unordered_map<std::string, std::string>& properties, int
format_version) {
+ for (const auto& [key, _] : properties) {
+ if (TableProperties::reserved_properties().contains(key)) {
+ return InvalidArgument(
+ "Table properties should not contain reserved properties, but got
{}", key);
+ }
+ }
+
+ // Reassign all column ids to ensure consistency
+ std::atomic<int32_t> last_column_id = 0;
+ auto next_id = [&last_column_id]() -> int32_t { return ++last_column_id; };
+ ICEBERG_ASSIGN_OR_RAISE(auto fresh_schema,
+ AssignFreshIds(Schema::kInitialSchemaId, schema,
next_id));
+
+ // rebuild the partition spec using the new column ids
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto fresh_spec,
+ FreshPartitionSpec(PartitionSpec::kInitialSpecId, *fresh_schema, schema,
spec));
+
+ // rebuild the sort order using the new column ids
+ int32_t fresh_order_id =
+ sort_order.is_unsorted() ? sort_order.order_id() :
SortOrder::kInitialSortOrderId;
+ ICEBERG_ASSIGN_OR_RAISE(auto fresh_order,
+ FreshSortOrder(fresh_order_id, *fresh_schema,
sort_order))
+
+ // Validata the metrics configuration.
+ ICEBERG_RETURN_UNEXPECTED(
+ MetricsConfig::VerifyReferencedColumns(properties, *fresh_schema));
+
+ // TODO(anyone) Validate commit properties
+
+ return TableMetadataBuilder::BuildFromEmpty(format_version)
+ ->SetLocation(location)
+ .AddSchema(std::move(fresh_schema))
Review Comment:
`AddXXX` does not set it as the default. We may need to use
`SetCurrentSchema`, `SetDefaultPartitionSpec` and `SetDefaultSortOrder` here.
##########
src/iceberg/table_identifier.h:
##########
@@ -35,6 +37,15 @@ struct ICEBERG_EXPORT Namespace {
std::vector<std::string> levels;
bool operator==(const Namespace& other) const { return levels ==
other.levels; }
+
+ std::string ToString() const {
+ std::ostringstream oss;
+ for (size_t i = 0; i < levels.size(); ++i) {
Review Comment:
`formatter_internal.h` has a `FormatRange` to implement this. Using it
requires adding a table_identifiler.cc since we cannot export internal header
here.
##########
src/iceberg/table_metadata.cc:
##########
@@ -65,6 +68,106 @@ std::string ToString(const MetadataLogEntry& entry) {
entry.metadata_file);
}
+Result<std::shared_ptr<PartitionSpec>> FreshPartitionSpec(int32_t spec_id,
+ const Schema&
fresh_schema,
+ const Schema&
base_schema,
+ const PartitionSpec&
spec) {
+ int32_t last_assigned_field_id = -1;
+ std::vector<PartitionField> partition_fields;
+ for (auto& field : spec.fields()) {
+ ICEBERG_ASSIGN_OR_RAISE(auto source_name,
+ base_schema.FindColumnNameById(field.field_id()));
+ if (!source_name.has_value()) [[unlikely]] {
+ return InvalidSchema("Partition field id {} does not exist in the
schema",
+ field.field_id());
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto fresh_field,
+ fresh_schema.FindFieldByName(source_name.value()));
+ if (!fresh_field.has_value()) [[unlikely]] {
+ return InvalidSchema("Partition field {} does not exist in the schema",
+ source_name.value());
+ }
+ partition_fields.emplace_back(
+ fresh_field.value().get().field_id(), ++last_assigned_field_id,
+ std::string(fresh_field.value().get().name()), field.transform());
+ }
+ return PartitionSpec::Make(fresh_schema, spec_id,
std::move(partition_fields), false,
+ last_assigned_field_id);
+}
+
+Result<std::shared_ptr<SortOrder>> FreshSortOrder(int32_t order_id, const
Schema& schema,
+ const SortOrder& order) {
+ if (order.is_unsorted()) {
+ return SortOrder::Unsorted();
+ }
+
+ std::vector<SortField> fresh_fields;
+ for (const auto& field : order.fields()) {
+ ICEBERG_ASSIGN_OR_RAISE(auto source_name,
+ schema.FindColumnNameById(field.source_id()));
+ if (!source_name.has_value()) {
+ return InvalidSchema("Unable to find source field with ID {} in the old
schema",
+ field.source_id());
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto fresh_field,
+ schema.FindFieldByName(source_name.value()));
+ if (!fresh_field.has_value()) {
+ return InvalidSchema("Unable to find field '{}' in the new schema",
+ source_name.value());
+ }
+
+ int32_t new_source_id = fresh_field.value().get().field_id();
+ fresh_fields.emplace_back(new_source_id, field.transform(),
field.direction(),
+ field.null_order());
+ }
+
+ return SortOrder::Make(order_id, std::move(fresh_fields));
+}
+
+Result<std::unique_ptr<TableMetadata>> TableMetadata::Make(
+ const iceberg::Schema& schema, const iceberg::PartitionSpec& spec,
+ const iceberg::SortOrder& sort_order, const std::string& location,
+ const std::unordered_map<std::string, std::string>& properties, int
format_version) {
+ for (const auto& [key, _] : properties) {
+ if (TableProperties::reserved_properties().contains(key)) {
+ return InvalidArgument(
+ "Table properties should not contain reserved properties, but got
{}", key);
+ }
+ }
+
+ // Reassign all column ids to ensure consistency
+ std::atomic<int32_t> last_column_id = 0;
Review Comment:
Why do we need to use atomic here?
##########
src/iceberg/table_metadata.cc:
##########
@@ -408,6 +518,10 @@ class TableMetadataBuilder::Impl {
const TableMetadata* base() const { return base_; }
const TableMetadata& metadata() const { return metadata_; }
+ void SetLocation(std::string_view location) {
+ metadata_.location = std::string(location);
Review Comment:
We might need to follow the java impl to validate the input and then produce
a update to the changes list.
```java
We might need to follow the Java impl to validate the input location and
then produce a TableUpdate to the changes list.
```java
public Builder setLocation(String newLocation) {
if (location != null && location.equals(newLocation)) {
return this;
}
this.location = newLocation;
changes.add(new MetadataUpdate.SetLocation(newLocation));
return this;
}
```
```
##########
src/iceberg/table_identifier.h:
##########
@@ -53,6 +64,27 @@ struct ICEBERG_EXPORT TableIdentifier {
}
return {};
}
+
+ std::string ToString() const { return ns.ToString() + '.' + name; }
};
} // namespace iceberg
+
+namespace std {
+
+template <>
+struct formatter<iceberg::Namespace> : std::formatter<std::string> {
Review Comment:
You don't need to explicitly add these as they are automatically supported
by including `iceberg/util/formatter.h`.
##########
src/iceberg/table_requirements.cc:
##########
@@ -134,4 +136,24 @@ Result<std::vector<std::unique_ptr<TableRequirement>>>
TableRequirements::ForUpd
return context.Build();
}
+Result<bool> TableRequirements::IsCreate(
+ const std::vector<std::unique_ptr<TableRequirement>>& requirements) {
+ bool is_create = std::ranges::any_of(requirements, [](const auto& req) {
+ return dynamic_cast<table::AssertDoesNotExist*>(req.get()) != nullptr;
Review Comment:
Please add an enum type to `TableRequirement` like `TableUpdate::kind()` and
use it here.
##########
src/iceberg/table_metadata.cc:
##########
@@ -50,12 +52,73 @@
#include "iceberg/util/gzip_internal.h"
#include "iceberg/util/location_util.h"
#include "iceberg/util/macros.h"
+#include "iceberg/util/property_util.h"
+#include "iceberg/util/type_util.h"
#include "iceberg/util/uuid.h"
namespace iceberg {
namespace {
const TimePointMs kInvalidLastUpdatedMs = TimePointMs::min();
constexpr int32_t kLastAdded = -1;
constexpr std::string_view kMetadataFolderName = "metadata";
+
+// TableMetadata private static methods
+Result<std::shared_ptr<PartitionSpec>> FreshPartitionSpec(
Review Comment:
I think `FreshPartitionSpec` and `FreshSortOrder` should also be called in
the `SetDefaultPartitionSpec` and `SetDefaultSortOrder`, respectively.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]