This is an automated email from the ASF dual-hosted git repository. zeroshade pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push: new f381393 fix(table): apply transform before comparing lower and upper partition values (#432) f381393 is described below commit f3813937d149b503052dcd1b7820d273f396417a Author: Joseph Daniel <godw...@users.noreply.github.com> AuthorDate: Mon May 19 20:05:07 2025 +0530 fix(table): apply transform before comparing lower and upper partition values (#432) While trying to to use `table.Transaction.AddFiles` API, I was getting this error, which was odd since I have a day transform. Digging into the code I found that we were comparing without applying the transform. ``` failed to add file to transaction: error encountered during parquet file conversion: cannot infer partition value from parquet metadata as there is more than one value for partition field: date_partition. (low: 1733183899000000, high: 1733183942000000) ``` This PR ensures that we first apply the transform and then do the comparison. Ref: https://apache-iceberg.slack.com/archives/C05J3MJ42BD/p1747319038338949 --- table/internal/utils.go | 20 ++++++---- table/internal/utils_test.go | 91 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+), 8 deletions(-) diff --git a/table/internal/utils.go b/table/internal/utils.go index eebe40c..82363e0 100644 --- a/table/internal/utils.go +++ b/table/internal/utils.go @@ -216,18 +216,22 @@ func (d *DataFileStatistics) PartitionValue(field iceberg.PartitionField, sc *ic field.Name, field.Transform)) } - lowerVal, upperVal := agg.Min(), agg.Max() - if !lowerVal.Equals(upperVal) { - panic(fmt.Errorf("cannot infer partition value from parquet metadata as there is more than one value for partition field: %s. (low: %s, high: %s)", - field.Name, lowerVal, upperVal)) - } + lowerRec := must(PartitionRecordValue(field, agg.Min(), sc)) + upperRec := must(PartitionRecordValue(field, agg.Max(), sc)) + + lowerT := field.Transform.Apply(lowerRec) + upperT := field.Transform.Apply(upperRec) - val := field.Transform.Apply(must(PartitionRecordValue(field, lowerVal, sc))) - if !val.Valid { + if !lowerT.Valid || !upperT.Valid { return nil } - return val.Val.Any() + if !lowerT.Val.Equals(upperT.Val) { + panic(fmt.Errorf("cannot infer partition value from parquet metadata as there is more than one value for partition field: %s. (low: %s, high: %s)", + field.Name, lowerT.Val, upperT.Val)) + } + + return lowerT.Val.Any() } func (d *DataFileStatistics) ToDataFile(schema *iceberg.Schema, spec iceberg.PartitionSpec, path string, format iceberg.FileFormat, filesize int64) iceberg.DataFile { diff --git a/table/internal/utils_test.go b/table/internal/utils_test.go index 60be46b..2466444 100644 --- a/table/internal/utils_test.go +++ b/table/internal/utils_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/table/internal" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -96,3 +97,93 @@ func TestMapExecFinish(t *testing.T) { time.Second, 10*time.Millisecond, ) } + +type mockStatsAgg struct { + min, max iceberg.Literal +} + +var _ internal.StatsAgg = (*mockStatsAgg)(nil) + +func (m *mockStatsAgg) Min() iceberg.Literal { return m.min } +func (m *mockStatsAgg) Max() iceberg.Literal { return m.max } +func (m *mockStatsAgg) Update(stats interface{ HasMinMax() bool }) {} +func (m *mockStatsAgg) MinAsBytes() ([]byte, error) { return nil, nil } +func (m *mockStatsAgg) MaxAsBytes() ([]byte, error) { return nil, nil } + +func TestPartitionValue_LinearTransforms(t *testing.T) { + schema := iceberg.NewSchema(1, iceberg.NestedField{ + ID: 1, + Name: "ts_col", + Type: iceberg.TimestampType{}, + }) + + cases := []struct { + name string + transform iceberg.Transform + min time.Time + max time.Time + }{ + { + name: "day", + transform: iceberg.DayTransform{}, + min: time.Date(2023, 5, 15, 0, 0, 0, 0, time.UTC), + max: time.Date(2023, 5, 15, 23, 59, 59, 0, time.UTC), + }, + { + name: "month", + transform: iceberg.MonthTransform{}, + min: time.Date(2023, 5, 1, 0, 0, 0, 0, time.UTC), + max: time.Date(2023, 5, 31, 23, 59, 59, 0, time.UTC), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + minLit := iceberg.NewLiteral(iceberg.Timestamp(tc.min.UnixMicro())) + maxLit := iceberg.NewLiteral(iceberg.Timestamp(tc.max.UnixMicro())) + + partitionField := iceberg.PartitionField{ + SourceID: 1, + FieldID: 100, + Name: tc.name + "_part", + Transform: tc.transform, + } + + stats := internal.DataFileStatistics{ + ColAggs: map[int]internal.StatsAgg{ + 1: &mockStatsAgg{min: minLit, max: maxLit}, + }, + } + + minLitOpt := iceberg.Optional[iceberg.Literal]{Val: minLit, Valid: true} + expected := tc.transform.Apply(minLitOpt).Val.Any() + got := stats.PartitionValue(partitionField, schema) + assert.Equal(t, expected, got) + }) + } +} + +func TestPartitionValue_MismatchPanics(t *testing.T) { + schema := iceberg.NewSchema(1, iceberg.NestedField{ + ID: 1, + Name: "ts_col", + Type: iceberg.TimestampType{}, + }) + partitionField := iceberg.PartitionField{ + SourceID: 1, + FieldID: 100, + Name: "date_part", + Transform: iceberg.DayTransform{}, + } + + day1 := iceberg.NewLiteral(iceberg.Timestamp(time.Date(2023, 5, 15, 12, 0, 0, 0, time.UTC).UnixMicro())) + day2 := iceberg.NewLiteral(iceberg.Timestamp(time.Date(2023, 5, 16, 12, 0, 0, 0, time.UTC).UnixMicro())) + + stats := internal.DataFileStatistics{ + ColAggs: map[int]internal.StatsAgg{ + 1: &mockStatsAgg{min: day1, max: day2}, + }, + } + + assert.Panics(t, func() { stats.PartitionValue(partitionField, schema) }) +}