zeroshade commented on code in PR #558:
URL: https://github.com/apache/iceberg-go/pull/558#discussion_r2331159864


##########
partitions.go:
##########
@@ -85,6 +87,130 @@ type PartitionSpec struct {
        sourceIdToFields map[int][]PartitionField
 }
 
+type PartitionOption func(*PartitionSpec) error
+
+func NewPartitionSpecOpts(opts ...PartitionOption) (PartitionSpec, error) {
+       spec := PartitionSpec{
+               id: 0,
+       }
+       for _, opt := range opts {
+               if err := opt(&spec); err != nil {
+                       return PartitionSpec{}, err
+               }
+       }
+       spec.initialize()
+
+       return spec, nil
+}
+
+func WithSpecID(id int) PartitionOption {
+       return func(p *PartitionSpec) error {
+               p.id = id
+
+               return nil
+       }
+}
+
+func AddPartitionFieldBySourceID(sourceID int, targetName string, transform 
Transform, schema *Schema, fieldID *int) PartitionOption {
+       return func(p *PartitionSpec) error {
+               if schema == nil {
+                       return errors.New("cannot add partition field with nil 
schema")
+               }
+               field, ok := schema.FindFieldByID(sourceID)
+               if !ok {
+                       return fmt.Errorf("cannot find source column with id: 
%d in schema", sourceID)
+               }
+               err := addSpecFieldInternal(p, targetName, field, transform, 
fieldID)
+               if err != nil {
+                       return err
+               }
+
+               return nil
+       }
+}
+
+func addSpecFieldInternal(p *PartitionSpec, targetName string, field 
NestedField, transform Transform, fieldID *int) error {
+       if targetName == "" {
+               return errors.New("cannot use empty partition name")
+       }
+       for _, existingField := range p.fields {
+               if existingField.Name == targetName {
+                       return errors.New("duplicate partition name: " + 
targetName)
+               }
+       }
+       var fieldIDValue int
+       if fieldID == nil {
+               fieldIDValue = unassignedFieldID
+       } else {
+               fieldIDValue = *fieldID
+       }
+       if err := p.checkForRedundantPartitions(field.ID, transform); err != 
nil {
+               return err
+       }
+       unboundField := PartitionField{
+               SourceID:  field.ID,
+               FieldID:   fieldIDValue,
+               Name:      targetName,
+               Transform: transform,
+       }
+       p.fields = append(p.fields, unboundField)
+
+       return nil
+}
+
+func (p *PartitionSpec) checkForRedundantPartitions(sourceID int, transform 
Transform) error {
+       for _, field := range p.fields {
+               if field.SourceID == sourceID && field.Transform.String() == 
transform.String() {
+                       return fmt.Errorf("cannot add redundant partition with 
source id %d and transform %s. A partition with the same source id and 
transform already exists with name: %s", sourceID, transform, field.Name)
+               }
+       }
+
+       return nil
+}
+
+func (p *PartitionSpec) SetID(id int) {
+       p.id = id
+}

Review Comment:
   if we're adding a `SetID` function, we might as well just make the `ID` 
field a public field instead of having `SetID` and `ID()` methods. That's more 
idiomatic for Go.
   
   That said, I would rather this instead be a `WithID(id int) *PartitionSpec` 
method which returns a copy of the partition spec with the updated ID. That way 
we maintain that the PartitionSpec object itself can still be immutable



##########
partitions.go:
##########
@@ -85,6 +87,130 @@ type PartitionSpec struct {
        sourceIdToFields map[int][]PartitionField
 }
 
+type PartitionOption func(*PartitionSpec) error
+
+func NewPartitionSpecOpts(opts ...PartitionOption) (PartitionSpec, error) {
+       spec := PartitionSpec{
+               id: 0,
+       }
+       for _, opt := range opts {
+               if err := opt(&spec); err != nil {
+                       return PartitionSpec{}, err
+               }
+       }
+       spec.initialize()
+
+       return spec, nil
+}
+
+func WithSpecID(id int) PartitionOption {
+       return func(p *PartitionSpec) error {
+               p.id = id
+
+               return nil
+       }
+}
+
+func AddPartitionFieldBySourceID(sourceID int, targetName string, transform 
Transform, schema *Schema, fieldID *int) PartitionOption {
+       return func(p *PartitionSpec) error {
+               if schema == nil {
+                       return errors.New("cannot add partition field with nil 
schema")
+               }
+               field, ok := schema.FindFieldByID(sourceID)
+               if !ok {
+                       return fmt.Errorf("cannot find source column with id: 
%d in schema", sourceID)
+               }
+               err := addSpecFieldInternal(p, targetName, field, transform, 
fieldID)
+               if err != nil {
+                       return err
+               }
+
+               return nil
+       }
+}
+
+func addSpecFieldInternal(p *PartitionSpec, targetName string, field 
NestedField, transform Transform, fieldID *int) error {
+       if targetName == "" {
+               return errors.New("cannot use empty partition name")
+       }
+       for _, existingField := range p.fields {
+               if existingField.Name == targetName {
+                       return errors.New("duplicate partition name: " + 
targetName)
+               }
+       }
+       var fieldIDValue int
+       if fieldID == nil {
+               fieldIDValue = unassignedFieldID
+       } else {
+               fieldIDValue = *fieldID
+       }
+       if err := p.checkForRedundantPartitions(field.ID, transform); err != 
nil {
+               return err
+       }
+       unboundField := PartitionField{
+               SourceID:  field.ID,
+               FieldID:   fieldIDValue,
+               Name:      targetName,
+               Transform: transform,
+       }
+       p.fields = append(p.fields, unboundField)
+
+       return nil
+}
+
+func (p *PartitionSpec) checkForRedundantPartitions(sourceID int, transform 
Transform) error {
+       for _, field := range p.fields {
+               if field.SourceID == sourceID && field.Transform.String() == 
transform.String() {
+                       return fmt.Errorf("cannot add redundant partition with 
source id %d and transform %s. A partition with the same source id and 
transform already exists with name: %s", sourceID, transform, field.Name)
+               }
+       }
+
+       return nil
+}
+
+func (p *PartitionSpec) SetID(id int) {
+       p.id = id
+}
+
+func (ps *PartitionSpec) AssignPartitionFieldIds(lastAssignedFieldIDPtr *int) 
error {

Review Comment:
   same argument as above, I'd rather keep a PartitionSpec as an immutable 
object, so my preference is this should return a copy of the PartitionSpec with 
the new IDs



##########
table/metadata.go:
##########
@@ -1234,6 +1359,17 @@ type metadataV2 struct {
        commonMetadata
 }
 
+func initMetadataV2Deser() *metadataV2 {
+       return &metadataV2{
+               LastSeqNum:     -1,
+               commonMetadata: initCommonMetadataForDeserialization(),
+       }
+}
+
+func (m *metadataV2) FormatVersion() int {
+       return 2
+}

Review Comment:
   common metadata already has a `Version` method which returns the 
`FormatVersion` value, why did we need a new method?



##########
table/metadata.go:
##########
@@ -890,6 +960,29 @@ type commonMetadata struct {
        SnapshotRefs       map[string]SnapshotRef  `json:"refs,omitempty"`
 }
 
+func initCommonMetadataForDeserialization() commonMetadata {
+       return commonMetadata{
+               FormatVersion:      0,
+               UUID:               uuid.UUID{},
+               Loc:                "",
+               LastUpdatedMS:      -1,
+               LastColumnId:       -1,
+               SchemaList:         nil,
+               CurrentSchemaID:    -1,
+               Specs:              nil,
+               DefaultSpecID:      -1,
+               LastPartitionID:    nil,
+               Props:              nil,
+               SnapshotList:       nil,
+               CurrentSnapshotID:  nil,
+               SnapshotLog:        nil,
+               MetadataLog:        nil,
+               SortOrderList:      nil,
+               DefaultSortOrderID: 0,
+               SnapshotRefs:       nil,
+       }

Review Comment:
   simplify this by removing everything that is just being set to the zero 
value. i.e.: you only need to include the things that are being set to -1.



##########
partitions.go:
##########
@@ -85,6 +87,130 @@ type PartitionSpec struct {
        sourceIdToFields map[int][]PartitionField
 }
 
+type PartitionOption func(*PartitionSpec) error
+
+func NewPartitionSpecOpts(opts ...PartitionOption) (PartitionSpec, error) {
+       spec := PartitionSpec{
+               id: 0,
+       }
+       for _, opt := range opts {
+               if err := opt(&spec); err != nil {
+                       return PartitionSpec{}, err
+               }
+       }
+       spec.initialize()
+
+       return spec, nil
+}
+
+func WithSpecID(id int) PartitionOption {
+       return func(p *PartitionSpec) error {
+               p.id = id
+
+               return nil
+       }
+}
+
+func AddPartitionFieldBySourceID(sourceID int, targetName string, transform 
Transform, schema *Schema, fieldID *int) PartitionOption {
+       return func(p *PartitionSpec) error {
+               if schema == nil {
+                       return errors.New("cannot add partition field with nil 
schema")
+               }
+               field, ok := schema.FindFieldByID(sourceID)
+               if !ok {
+                       return fmt.Errorf("cannot find source column with id: 
%d in schema", sourceID)
+               }
+               err := addSpecFieldInternal(p, targetName, field, transform, 
fieldID)
+               if err != nil {
+                       return err
+               }
+
+               return nil
+       }
+}
+
+func addSpecFieldInternal(p *PartitionSpec, targetName string, field 
NestedField, transform Transform, fieldID *int) error {
+       if targetName == "" {
+               return errors.New("cannot use empty partition name")
+       }
+       for _, existingField := range p.fields {
+               if existingField.Name == targetName {
+                       return errors.New("duplicate partition name: " + 
targetName)
+               }
+       }
+       var fieldIDValue int
+       if fieldID == nil {
+               fieldIDValue = unassignedFieldID
+       } else {
+               fieldIDValue = *fieldID
+       }
+       if err := p.checkForRedundantPartitions(field.ID, transform); err != 
nil {
+               return err
+       }
+       unboundField := PartitionField{
+               SourceID:  field.ID,
+               FieldID:   fieldIDValue,
+               Name:      targetName,
+               Transform: transform,
+       }
+       p.fields = append(p.fields, unboundField)
+
+       return nil
+}
+
+func (p *PartitionSpec) checkForRedundantPartitions(sourceID int, transform 
Transform) error {
+       for _, field := range p.fields {
+               if field.SourceID == sourceID && field.Transform.String() == 
transform.String() {
+                       return fmt.Errorf("cannot add redundant partition with 
source id %d and transform %s. A partition with the same source id and 
transform already exists with name: %s", sourceID, transform, field.Name)
+               }
+       }
+
+       return nil
+}
+
+func (p *PartitionSpec) SetID(id int) {
+       p.id = id
+}
+
+func (ps *PartitionSpec) AssignPartitionFieldIds(lastAssignedFieldIDPtr *int) 
error {
+       // This is set_field_ids from iceberg-rust
+       // Already assigned partition ids. If we see one of these during 
iteration,
+       // we skip it.
+       assignedIds := make(map[int]struct{})
+       for _, field := range ps.fields {
+               if field.FieldID != unassignedFieldID {
+                       if _, exists := assignedIds[field.FieldID]; exists {
+                               return fmt.Errorf("duplicate field ID provided: 
%d", field.FieldID)
+                       }
+                       assignedIds[field.FieldID] = struct{}{}
+               }
+       }
+
+       lastAssignedFieldID := ps.LastAssignedFieldID()
+       if lastAssignedFieldIDPtr != nil {
+               lastAssignedFieldID = *lastAssignedFieldIDPtr
+       }

Review Comment:
   instead of taking a pointer, can we just take an `int` and if the value is 
`<= 0` we use the current `LastAssignedFieldID()`?



##########
partitions.go:
##########
@@ -85,6 +87,130 @@ type PartitionSpec struct {
        sourceIdToFields map[int][]PartitionField
 }
 
+type PartitionOption func(*PartitionSpec) error
+
+func NewPartitionSpecOpts(opts ...PartitionOption) (PartitionSpec, error) {
+       spec := PartitionSpec{
+               id: 0,
+       }
+       for _, opt := range opts {
+               if err := opt(&spec); err != nil {
+                       return PartitionSpec{}, err
+               }
+       }
+       spec.initialize()
+
+       return spec, nil
+}
+
+func WithSpecID(id int) PartitionOption {
+       return func(p *PartitionSpec) error {
+               p.id = id
+
+               return nil
+       }
+}
+
+func AddPartitionFieldBySourceID(sourceID int, targetName string, transform 
Transform, schema *Schema, fieldID *int) PartitionOption {
+       return func(p *PartitionSpec) error {
+               if schema == nil {
+                       return errors.New("cannot add partition field with nil 
schema")
+               }
+               field, ok := schema.FindFieldByID(sourceID)
+               if !ok {
+                       return fmt.Errorf("cannot find source column with id: 
%d in schema", sourceID)
+               }
+               err := addSpecFieldInternal(p, targetName, field, transform, 
fieldID)
+               if err != nil {
+                       return err
+               }
+
+               return nil
+       }
+}
+
+func addSpecFieldInternal(p *PartitionSpec, targetName string, field 
NestedField, transform Transform, fieldID *int) error {
+       if targetName == "" {
+               return errors.New("cannot use empty partition name")
+       }
+       for _, existingField := range p.fields {
+               if existingField.Name == targetName {
+                       return errors.New("duplicate partition name: " + 
targetName)
+               }
+       }
+       var fieldIDValue int
+       if fieldID == nil {
+               fieldIDValue = unassignedFieldID
+       } else {
+               fieldIDValue = *fieldID
+       }
+       if err := p.checkForRedundantPartitions(field.ID, transform); err != 
nil {
+               return err
+       }
+       unboundField := PartitionField{
+               SourceID:  field.ID,
+               FieldID:   fieldIDValue,
+               Name:      targetName,
+               Transform: transform,
+       }
+       p.fields = append(p.fields, unboundField)
+
+       return nil
+}
+
+func (p *PartitionSpec) checkForRedundantPartitions(sourceID int, transform 
Transform) error {
+       for _, field := range p.fields {
+               if field.SourceID == sourceID && field.Transform.String() == 
transform.String() {
+                       return fmt.Errorf("cannot add redundant partition with 
source id %d and transform %s. A partition with the same source id and 
transform already exists with name: %s", sourceID, transform, field.Name)

Review Comment:
   put the remaining arguments on a new line to reduce the line wrap and 
length. Also, we already have a mapping of source-id to the field, so you can 
do 
   
   ```go
   if fields, ok := p.sourceIdToFields[sourceID]; ok {
      for _, f := range fields {
           if f.Transform.Equals(transform) {
               return fmt.Errorf(.....)
           }
       }
   }
   
   return nil
   ```
   
   instead
   
     



##########
partitions.go:
##########
@@ -85,6 +87,130 @@ type PartitionSpec struct {
        sourceIdToFields map[int][]PartitionField
 }
 
+type PartitionOption func(*PartitionSpec) error
+
+func NewPartitionSpecOpts(opts ...PartitionOption) (PartitionSpec, error) {
+       spec := PartitionSpec{
+               id: 0,
+       }
+       for _, opt := range opts {
+               if err := opt(&spec); err != nil {
+                       return PartitionSpec{}, err
+               }
+       }
+       spec.initialize()
+
+       return spec, nil
+}
+
+func WithSpecID(id int) PartitionOption {
+       return func(p *PartitionSpec) error {
+               p.id = id
+
+               return nil
+       }
+}
+
+func AddPartitionFieldBySourceID(sourceID int, targetName string, transform 
Transform, schema *Schema, fieldID *int) PartitionOption {
+       return func(p *PartitionSpec) error {
+               if schema == nil {
+                       return errors.New("cannot add partition field with nil 
schema")
+               }
+               field, ok := schema.FindFieldByID(sourceID)
+               if !ok {
+                       return fmt.Errorf("cannot find source column with id: 
%d in schema", sourceID)
+               }
+               err := addSpecFieldInternal(p, targetName, field, transform, 
fieldID)
+               if err != nil {
+                       return err
+               }
+
+               return nil
+       }
+}
+
+func addSpecFieldInternal(p *PartitionSpec, targetName string, field 
NestedField, transform Transform, fieldID *int) error {

Review Comment:
   why not make this a method on `PartitionSpec`?



##########
table/metadata.go:
##########
@@ -1156,6 +1255,18 @@ type metadataV1 struct {
        commonMetadata
 }
 
+func initMetadataV1Deser() *metadataV1 {
+       return &metadataV1{
+               Schema:         nil,
+               Partition:      nil,

Review Comment:
   same comment as above, don't include these if you're just setting them to 
the zero value



##########
partitions.go:
##########
@@ -85,6 +87,130 @@ type PartitionSpec struct {
        sourceIdToFields map[int][]PartitionField
 }
 
+type PartitionOption func(*PartitionSpec) error
+
+func NewPartitionSpecOpts(opts ...PartitionOption) (PartitionSpec, error) {

Review Comment:
   these need docstring comments to explain why someone would use this instead 
of `NewPartitionSpec` etc.



##########
partitions.go:
##########
@@ -85,6 +87,130 @@ type PartitionSpec struct {
        sourceIdToFields map[int][]PartitionField
 }
 
+type PartitionOption func(*PartitionSpec) error
+
+func NewPartitionSpecOpts(opts ...PartitionOption) (PartitionSpec, error) {
+       spec := PartitionSpec{
+               id: 0,
+       }
+       for _, opt := range opts {
+               if err := opt(&spec); err != nil {
+                       return PartitionSpec{}, err
+               }
+       }
+       spec.initialize()
+
+       return spec, nil
+}
+
+func WithSpecID(id int) PartitionOption {
+       return func(p *PartitionSpec) error {
+               p.id = id
+
+               return nil
+       }
+}
+
+func AddPartitionFieldBySourceID(sourceID int, targetName string, transform 
Transform, schema *Schema, fieldID *int) PartitionOption {
+       return func(p *PartitionSpec) error {
+               if schema == nil {
+                       return errors.New("cannot add partition field with nil 
schema")
+               }
+               field, ok := schema.FindFieldByID(sourceID)
+               if !ok {
+                       return fmt.Errorf("cannot find source column with id: 
%d in schema", sourceID)
+               }
+               err := addSpecFieldInternal(p, targetName, field, transform, 
fieldID)
+               if err != nil {
+                       return err
+               }
+
+               return nil
+       }
+}
+
+func addSpecFieldInternal(p *PartitionSpec, targetName string, field 
NestedField, transform Transform, fieldID *int) error {
+       if targetName == "" {
+               return errors.New("cannot use empty partition name")
+       }
+       for _, existingField := range p.fields {
+               if existingField.Name == targetName {
+                       return errors.New("duplicate partition name: " + 
targetName)
+               }
+       }
+       var fieldIDValue int
+       if fieldID == nil {
+               fieldIDValue = unassignedFieldID
+       } else {
+               fieldIDValue = *fieldID
+       }
+       if err := p.checkForRedundantPartitions(field.ID, transform); err != 
nil {
+               return err
+       }
+       unboundField := PartitionField{
+               SourceID:  field.ID,
+               FieldID:   fieldIDValue,
+               Name:      targetName,
+               Transform: transform,
+       }
+       p.fields = append(p.fields, unboundField)
+
+       return nil
+}
+
+func (p *PartitionSpec) checkForRedundantPartitions(sourceID int, transform 
Transform) error {
+       for _, field := range p.fields {
+               if field.SourceID == sourceID && field.Transform.String() == 
transform.String() {
+                       return fmt.Errorf("cannot add redundant partition with 
source id %d and transform %s. A partition with the same source id and 
transform already exists with name: %s", sourceID, transform, field.Name)
+               }
+       }
+
+       return nil
+}
+
+func (p *PartitionSpec) SetID(id int) {
+       p.id = id
+}
+
+func (ps *PartitionSpec) AssignPartitionFieldIds(lastAssignedFieldIDPtr *int) 
error {
+       // This is set_field_ids from iceberg-rust
+       // Already assigned partition ids. If we see one of these during 
iteration,
+       // we skip it.
+       assignedIds := make(map[int]struct{})
+       for _, field := range ps.fields {
+               if field.FieldID != unassignedFieldID {
+                       if _, exists := assignedIds[field.FieldID]; exists {
+                               return fmt.Errorf("duplicate field ID provided: 
%d", field.FieldID)
+                       }
+                       assignedIds[field.FieldID] = struct{}{}
+               }
+       }

Review Comment:
   shouldn't this get done during initialization or during `NewPartitionSpec` 
instead of here?



##########
table/metadata.go:
##########
@@ -134,6 +134,8 @@ type Metadata interface {
        NameMapping() iceberg.NameMapping
 
        LastSequenceNumber() int64
+
+       FormatVersion() int

Review Comment:
   there's already `Version()` why do we need this also?



##########
table/updates.go:
##########
@@ -553,20 +553,22 @@ func (u *removeSnapshotRefUpdate) Apply(builder 
*MetadataBuilder) error {
 
 type removeSpecUpdate struct {
        baseUpdate
-       SpecIds []int64 `json:"spec-ids"`
+       SpecIds []int `json:"spec-ids"`

Review Comment:
   why this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to