This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new a012a177 fix(table): ensure partition path is deterministic (#767)
a012a177 is described below
commit a012a1770d70e0df6babcb63e42058f484390602
Author: ferhat elmas <[email protected]>
AuthorDate: Sun Mar 15 05:05:59 2026 +0100
fix(table): ensure partition path is deterministic (#767)
partition record order is expected to match partition spec
but maps.Values can change it and cause cross-partition writer reuse,
delete expected files, etc.
related to #721
Signed-off-by: ferhat elmas <[email protected]>
---
table/pos_delete_partitioned_fanout_writer.go | 12 ++-
table/pos_delete_partitioned_fanout_writer_test.go | 85 ++++++++++++++++++++++
2 files changed, 94 insertions(+), 3 deletions(-)
diff --git a/table/pos_delete_partitioned_fanout_writer.go
b/table/pos_delete_partitioned_fanout_writer.go
index 4b3daeaa..f03c7019 100644
--- a/table/pos_delete_partitioned_fanout_writer.go
+++ b/table/pos_delete_partitioned_fanout_writer.go
@@ -21,8 +21,6 @@ import (
"context"
"fmt"
"iter"
- "maps"
- "slices"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
@@ -132,12 +130,20 @@ func (p *positionDeletePartitionedFanoutWriter)
processBatch(ctx context.Context
}
func (p *positionDeletePartitionedFanoutWriter) partitionPath(partitionContext
partitionContext) (string, error) {
- data :=
partitionRecord(slices.Collect(maps.Values(partitionContext.partitionData)))
spec := p.metadata.PartitionSpecByID(int(partitionContext.specID))
if spec == nil {
return "", fmt.Errorf("unexpected missing partition spec in
metadata for spec id %d", partitionContext.specID)
}
+ data := make(partitionRecord, spec.NumFields())
+ for i, field := range spec.Fields() {
+ val, ok := partitionContext.partitionData[field.FieldID]
+ if !ok {
+ return "", fmt.Errorf("unexpected missing partition
value for field id %d in spec id %d", field.FieldID, partitionContext.specID)
+ }
+ data[i] = val
+ }
+
return spec.PartitionToPath(data, p.schema), nil
}
diff --git a/table/pos_delete_partitioned_fanout_writer_test.go
b/table/pos_delete_partitioned_fanout_writer_test.go
index c3a44210..c0a75dab 100644
--- a/table/pos_delete_partitioned_fanout_writer_test.go
+++ b/table/pos_delete_partitioned_fanout_writer_test.go
@@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"maps"
+ "slices"
"strings"
"testing"
"time"
@@ -160,6 +161,90 @@ func
TestPositionDeletePartitionedFanoutWriterProcessBatch(t *testing.T) {
}
}
+func TestPositionDeletePartitionedFanoutWriterPartitionPathIsDeterministic(t
*testing.T) {
+ t.Parallel()
+
+ partitionSpec := iceberg.NewPartitionSpec(
+ iceberg.PartitionField{
+ FieldID: 1000,
+ SourceID: 2147483546, // file_path
+ Name: "file_path",
+ Transform: iceberg.IdentityTransform{},
+ },
+ iceberg.PartitionField{
+ FieldID: 1001,
+ SourceID: 2147483545, // pos
+ Name: "pos",
+ Transform: iceberg.IdentityTransform{},
+ },
+ iceberg.PartitionField{
+ FieldID: 1002,
+ SourceID: 2147483545, // pos
+ Name: "pos_bucket",
+ Transform: iceberg.BucketTransform{
+ NumBuckets: 128,
+ },
+ },
+ )
+
+ metadataBuilder, err := NewMetadataBuilder(2)
+ require.NoError(t, err)
+ err = metadataBuilder.AddSchema(iceberg.PositionalDeleteSchema)
+ require.NoError(t, err)
+ err = metadataBuilder.SetCurrentSchemaID(0)
+ require.NoError(t, err)
+ err = metadataBuilder.AddPartitionSpec(&partitionSpec, true)
+ require.NoError(t, err)
+ err = metadataBuilder.SetDefaultSpecID(0)
+ require.NoError(t, err)
+ sortOrder, err := NewSortOrder(1, []SortField{{
+ SourceID: 2147483546,
+ Direction: SortASC,
+ Transform: iceberg.IdentityTransform{},
+ NullOrder: NullsFirst,
+ }})
+ require.NoError(t, err)
+ err = metadataBuilder.AddSortOrder(&sortOrder)
+ require.NoError(t, err)
+ err = metadataBuilder.SetDefaultSortOrderID(1)
+ require.NoError(t, err)
+
+ latestMeta, err := metadataBuilder.Build()
+ require.NoError(t, err)
+
+ writer := &positionDeletePartitionedFanoutWriter{
+ metadata: latestMeta,
+ schema: iceberg.PositionalDeleteSchema,
+ }
+
+ ctx := partitionContext{
+ specID: 0,
+ partitionData: map[int]any{
+ 1000: "file://ns/data-file.parquet",
+ 1001: int64(42),
+ 1002: int32(7),
+ },
+ }
+
+ expectedPath := partitionSpec.PartitionToPath(partitionRecord{
+ ctx.partitionData[1000],
+ ctx.partitionData[1001],
+ ctx.partitionData[1002],
+ }, iceberg.PositionalDeleteSchema)
+
+ // run multiple times to ensure it consistently
+ // produces the same output for the same input context
+ seen := make(map[string]struct{})
+ for range 1024 {
+ path, err := writer.partitionPath(ctx)
+ require.NoError(t, err)
+ seen[path] = struct{}{}
+ }
+
+ require.Lenf(t, seen, 1, "partition path must be stable for the same
input map, got paths: %v", slices.Collect(maps.Keys(seen)))
+ require.Contains(t, seen, expectedPath)
+}
+
func onlyContext(ctx context.Context, _ func()) context.Context {
return ctx
}