zhjwpku commented on code in PR #401: URL: https://github.com/apache/iceberg-cpp/pull/401#discussion_r2619741058
########## src/iceberg/update/update_partition_spec.cc: ########## @@ -0,0 +1,608 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/update_partition_spec.h" + +#include <format> + +#include "iceberg/catalog.h" +#include "iceberg/expression/term.h" +#include "iceberg/partition_field.h" +#include "iceberg/partition_spec.h" +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/table.h" +#include "iceberg/table_identifier.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_requirements.h" +#include "iceberg/table_update.h" +#include "iceberg/transform.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +UpdatePartitionSpec::UpdatePartitionSpec(TableIdentifier identifier, + std::shared_ptr<Catalog> catalog, + std::shared_ptr<TableMetadata> base) + : identifier_(std::move(identifier)), + catalog_(std::move(catalog)), + base_metadata_(std::move(base)) { + ICEBERG_DCHECK(catalog_, "Catalog is required to construct UpdatePartitionSpec"); + ICEBERG_DCHECK(base_metadata_, + "Base table metadata is required to construct UpdatePartitionSpec"); + format_version_ = base_metadata_->format_version; + + // Get the current/default partition spec + auto spec_result = base_metadata_->PartitionSpec(); + if (!spec_result.has_value()) { + AddError(spec_result.error()); + return; + } + spec_ = std::move(spec_result.value()); + + // Get the current schema + auto schema_result = base_metadata_->Schema(); + if (!schema_result.has_value()) { + AddError(schema_result.error()); + return; + } + schema_ = std::move(schema_result.value()); + + last_assigned_partition_id_ = spec_->last_assigned_field_id(); + name_to_field_ = IndexSpecByName(*spec_); + transform_to_field_ = IndexSpecByTransform(*spec_); + + // Check for unknown transforms + for (const auto& field : spec_->fields()) { + if (field.transform()->transform_type() == TransformType::kUnknown) { + AddError(ErrorKind::kInvalidArgument, + std::format("Cannot update partition spec with unknown transform: {}", + field.ToString())); + return; + } + } +} + +UpdatePartitionSpec::~UpdatePartitionSpec() = default; + +UpdatePartitionSpec& UpdatePartitionSpec::CaseSensitive(bool is_case_sensitive) { + case_sensitive_ = is_case_sensitive; + return *this; +} + +UpdatePartitionSpec& UpdatePartitionSpec::AddNonDefaultSpec() { + set_as_default_ = false; + return *this; +} + +UpdatePartitionSpec& UpdatePartitionSpec::AddField(const std::string& source_name) { + // Find the source field in the schema + auto field_result = schema_->FindFieldByName(source_name, case_sensitive_); + if (!field_result.has_value()) { + AddError(ErrorKind::kInvalidArgument, + std::format("Cannot find source field: {}", source_name)); + return *this; + } + + auto field_opt = field_result.value(); + if (!field_opt.has_value()) { + AddError(ErrorKind::kInvalidArgument, + std::format("Cannot find source field: {}", source_name)); + return *this; + } + + int32_t source_id = field_opt.value().get().field_id(); + return AddFieldInternal(nullptr, source_id, Transform::Identity()); +} + +UpdatePartitionSpec& UpdatePartitionSpec::AddField( + std::shared_ptr<UnboundTerm<BoundReference>> term) { + return AddField(std::nullopt, std::move(term)); +} + +UpdatePartitionSpec& UpdatePartitionSpec::AddField( + std::shared_ptr<UnboundTerm<BoundTransform>> term) { + return AddField(std::nullopt, std::move(term)); +} + +UpdatePartitionSpec& UpdatePartitionSpec::AddField( + std::optional<std::string> name, std::shared_ptr<UnboundTerm<BoundReference>> term) { + // Bind the term to get the source field + auto bound_result = term->Bind(*schema_, case_sensitive_); + if (!bound_result.has_value()) { + AddError(ErrorKind::kInvalidArgument, + std::format("Cannot bind term: {}", term->ToString())); + return *this; + } + + auto bound_ref = bound_result.value(); + int32_t source_id = bound_ref->field().field_id(); + + // Reference terms use identity transform + return AddFieldInternal(name ? &name.value() : nullptr, source_id, + Transform::Identity()); +} + +UpdatePartitionSpec& UpdatePartitionSpec::AddField( + std::optional<std::string> name, std::shared_ptr<UnboundTerm<BoundTransform>> term) { + // Bind the term to get the source field and transform + auto bound_result = term->Bind(*schema_, case_sensitive_); + if (!bound_result.has_value()) { + AddError(ErrorKind::kInvalidArgument, + std::format("Cannot bind term: {}", term->ToString())); + return *this; + } + + auto bound_transform = bound_result.value(); + int32_t source_id = bound_transform->reference()->field().field_id(); + auto transform = bound_transform->transform(); + + return AddFieldInternal(name ? &name.value() : nullptr, source_id, transform); +} + +UpdatePartitionSpec& UpdatePartitionSpec::AddFieldInternal( + const std::string* name, int32_t source_id, std::shared_ptr<Transform> transform) { + // Check for duplicate name in added fields + if (name != nullptr) { + auto it = name_to_added_field_.find(*name); + if (it != name_to_added_field_.end()) { + AddError(ErrorKind::kInvalidArgument, + std::format("Cannot add duplicate partition field: {}", *name)); + return *this; + } + } + + TransformKey validation_key{source_id, transform->ToString()}; + + // Check if this field already exists in the current spec + auto existing_it = transform_to_field_.find(validation_key); + if (existing_it != transform_to_field_.end()) { + const auto& existing = existing_it->second; + if (deletes_.contains(existing.field_id()) && *existing.transform() == *transform) { + // If the field was deleted and we're re-adding the same one, just undo the delete + return RewriteDeleteAndAddField(existing, name); + } + + if (deletes_.find(existing.field_id()) == deletes_.end()) { + AddError( + ErrorKind::kInvalidArgument, + std::format( + "Cannot add duplicate partition field for source {} with transform {}, " + "conflicts with {}", + source_id, transform->ToString(), existing.ToString())); + return *this; + } + } + + // Check if already being added + auto added_it = transform_to_added_field_.find(validation_key); + if (added_it != transform_to_added_field_.end()) { + AddError(ErrorKind::kInvalidArgument, + std::format( + "Cannot add duplicate partition field for source {} with transform {}, " + "already added: {}", + source_id, transform->ToString(), added_it->second.ToString())); + return *this; + } + + // Create or recycle the partition field + PartitionField new_field = RecycleOrCreatePartitionField(source_id, transform, name); + + // Generate name if not provided + std::string field_name; + if (name != nullptr) { Review Comment: I think checking `new_field.name().empty()` won't work when it find a recycled part field with it's name not empty, am I missing something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
