twuebi commented on code in PR #558:
URL: https://github.com/apache/iceberg-go/pull/558#discussion_r2334555595


##########
partitions.go:
##########
@@ -85,6 +87,130 @@ type PartitionSpec struct {
        sourceIdToFields map[int][]PartitionField
 }
 
+type PartitionOption func(*PartitionSpec) error
+
+func NewPartitionSpecOpts(opts ...PartitionOption) (PartitionSpec, error) {
+       spec := PartitionSpec{
+               id: 0,
+       }
+       for _, opt := range opts {
+               if err := opt(&spec); err != nil {
+                       return PartitionSpec{}, err
+               }
+       }
+       spec.initialize()
+
+       return spec, nil
+}
+
+func WithSpecID(id int) PartitionOption {
+       return func(p *PartitionSpec) error {
+               p.id = id
+
+               return nil
+       }
+}
+
+func AddPartitionFieldBySourceID(sourceID int, targetName string, transform 
Transform, schema *Schema, fieldID *int) PartitionOption {
+       return func(p *PartitionSpec) error {
+               if schema == nil {
+                       return errors.New("cannot add partition field with nil 
schema")
+               }
+               field, ok := schema.FindFieldByID(sourceID)
+               if !ok {
+                       return fmt.Errorf("cannot find source column with id: 
%d in schema", sourceID)
+               }
+               err := addSpecFieldInternal(p, targetName, field, transform, 
fieldID)
+               if err != nil {
+                       return err
+               }
+
+               return nil
+       }
+}
+
+func addSpecFieldInternal(p *PartitionSpec, targetName string, field 
NestedField, transform Transform, fieldID *int) error {
+       if targetName == "" {
+               return errors.New("cannot use empty partition name")
+       }
+       for _, existingField := range p.fields {
+               if existingField.Name == targetName {
+                       return errors.New("duplicate partition name: " + 
targetName)
+               }
+       }
+       var fieldIDValue int
+       if fieldID == nil {
+               fieldIDValue = unassignedFieldID
+       } else {
+               fieldIDValue = *fieldID
+       }
+       if err := p.checkForRedundantPartitions(field.ID, transform); err != 
nil {
+               return err
+       }
+       unboundField := PartitionField{
+               SourceID:  field.ID,
+               FieldID:   fieldIDValue,
+               Name:      targetName,
+               Transform: transform,
+       }
+       p.fields = append(p.fields, unboundField)
+
+       return nil
+}
+
+func (p *PartitionSpec) checkForRedundantPartitions(sourceID int, transform 
Transform) error {
+       for _, field := range p.fields {
+               if field.SourceID == sourceID && field.Transform.String() == 
transform.String() {
+                       return fmt.Errorf("cannot add redundant partition with 
source id %d and transform %s. A partition with the same source id and 
transform already exists with name: %s", sourceID, transform, field.Name)
+               }
+       }
+
+       return nil
+}
+
+func (p *PartitionSpec) SetID(id int) {
+       p.id = id
+}

Review Comment:
   removed, only callsite was better served by moving all of it into the new 
method `PartitionSpec.BindToSchema`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to