This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new a012a177 fix(table): ensure partition path is deterministic (#767)
a012a177 is described below

commit a012a1770d70e0df6babcb63e42058f484390602
Author: ferhat elmas <[email protected]>
AuthorDate: Sun Mar 15 05:05:59 2026 +0100

    fix(table): ensure partition path is deterministic (#767)
    
    partition record order is expected to match partition spec
    but maps.Values can change it and cause cross-partition writer reuse,
    delete expected files, etc.
    
    related to #721
    
    Signed-off-by: ferhat elmas <[email protected]>
---
 table/pos_delete_partitioned_fanout_writer.go      | 12 ++-
 table/pos_delete_partitioned_fanout_writer_test.go | 85 ++++++++++++++++++++++
 2 files changed, 94 insertions(+), 3 deletions(-)

diff --git a/table/pos_delete_partitioned_fanout_writer.go 
b/table/pos_delete_partitioned_fanout_writer.go
index 4b3daeaa..f03c7019 100644
--- a/table/pos_delete_partitioned_fanout_writer.go
+++ b/table/pos_delete_partitioned_fanout_writer.go
@@ -21,8 +21,6 @@ import (
        "context"
        "fmt"
        "iter"
-       "maps"
-       "slices"
 
        "github.com/apache/arrow-go/v18/arrow"
        "github.com/apache/arrow-go/v18/arrow/array"
@@ -132,12 +130,20 @@ func (p *positionDeletePartitionedFanoutWriter) 
processBatch(ctx context.Context
 }
 
 func (p *positionDeletePartitionedFanoutWriter) partitionPath(partitionContext 
partitionContext) (string, error) {
-       data := 
partitionRecord(slices.Collect(maps.Values(partitionContext.partitionData)))
        spec := p.metadata.PartitionSpecByID(int(partitionContext.specID))
        if spec == nil {
                return "", fmt.Errorf("unexpected missing partition spec in 
metadata for spec id %d", partitionContext.specID)
        }
 
+       data := make(partitionRecord, spec.NumFields())
+       for i, field := range spec.Fields() {
+               val, ok := partitionContext.partitionData[field.FieldID]
+               if !ok {
+                       return "", fmt.Errorf("unexpected missing partition 
value for field id %d in spec id %d", field.FieldID, partitionContext.specID)
+               }
+               data[i] = val
+       }
+
        return spec.PartitionToPath(data, p.schema), nil
 }
 
diff --git a/table/pos_delete_partitioned_fanout_writer_test.go 
b/table/pos_delete_partitioned_fanout_writer_test.go
index c3a44210..c0a75dab 100644
--- a/table/pos_delete_partitioned_fanout_writer_test.go
+++ b/table/pos_delete_partitioned_fanout_writer_test.go
@@ -22,6 +22,7 @@ import (
        "errors"
        "fmt"
        "maps"
+       "slices"
        "strings"
        "testing"
        "time"
@@ -160,6 +161,90 @@ func 
TestPositionDeletePartitionedFanoutWriterProcessBatch(t *testing.T) {
        }
 }
 
+func TestPositionDeletePartitionedFanoutWriterPartitionPathIsDeterministic(t 
*testing.T) {
+       t.Parallel()
+
+       partitionSpec := iceberg.NewPartitionSpec(
+               iceberg.PartitionField{
+                       FieldID:   1000,
+                       SourceID:  2147483546, // file_path
+                       Name:      "file_path",
+                       Transform: iceberg.IdentityTransform{},
+               },
+               iceberg.PartitionField{
+                       FieldID:   1001,
+                       SourceID:  2147483545, // pos
+                       Name:      "pos",
+                       Transform: iceberg.IdentityTransform{},
+               },
+               iceberg.PartitionField{
+                       FieldID:  1002,
+                       SourceID: 2147483545, // pos
+                       Name:     "pos_bucket",
+                       Transform: iceberg.BucketTransform{
+                               NumBuckets: 128,
+                       },
+               },
+       )
+
+       metadataBuilder, err := NewMetadataBuilder(2)
+       require.NoError(t, err)
+       err = metadataBuilder.AddSchema(iceberg.PositionalDeleteSchema)
+       require.NoError(t, err)
+       err = metadataBuilder.SetCurrentSchemaID(0)
+       require.NoError(t, err)
+       err = metadataBuilder.AddPartitionSpec(&partitionSpec, true)
+       require.NoError(t, err)
+       err = metadataBuilder.SetDefaultSpecID(0)
+       require.NoError(t, err)
+       sortOrder, err := NewSortOrder(1, []SortField{{
+               SourceID:  2147483546,
+               Direction: SortASC,
+               Transform: iceberg.IdentityTransform{},
+               NullOrder: NullsFirst,
+       }})
+       require.NoError(t, err)
+       err = metadataBuilder.AddSortOrder(&sortOrder)
+       require.NoError(t, err)
+       err = metadataBuilder.SetDefaultSortOrderID(1)
+       require.NoError(t, err)
+
+       latestMeta, err := metadataBuilder.Build()
+       require.NoError(t, err)
+
+       writer := &positionDeletePartitionedFanoutWriter{
+               metadata: latestMeta,
+               schema:   iceberg.PositionalDeleteSchema,
+       }
+
+       ctx := partitionContext{
+               specID: 0,
+               partitionData: map[int]any{
+                       1000: "file://ns/data-file.parquet",
+                       1001: int64(42),
+                       1002: int32(7),
+               },
+       }
+
+       expectedPath := partitionSpec.PartitionToPath(partitionRecord{
+               ctx.partitionData[1000],
+               ctx.partitionData[1001],
+               ctx.partitionData[1002],
+       }, iceberg.PositionalDeleteSchema)
+
+       // run multiple times to ensure it consistently
+       // produces the same output for the same input context
+       seen := make(map[string]struct{})
+       for range 1024 {
+               path, err := writer.partitionPath(ctx)
+               require.NoError(t, err)
+               seen[path] = struct{}{}
+       }
+
+       require.Lenf(t, seen, 1, "partition path must be stable for the same 
input map, got paths: %v", slices.Collect(maps.Keys(seen)))
+       require.Contains(t, seen, expectedPath)
+}
+
 func onlyContext(ctx context.Context, _ func()) context.Context {
        return ctx
 }

Reply via email to