This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new f381393  fix(table): apply transform before comparing lower and upper 
partition values (#432)
f381393 is described below

commit f3813937d149b503052dcd1b7820d273f396417a
Author: Joseph Daniel <godw...@users.noreply.github.com>
AuthorDate: Mon May 19 20:05:07 2025 +0530

    fix(table): apply transform before comparing lower and upper partition 
values (#432)
    
    While trying to to use `table.Transaction.AddFiles` API, I was getting
    this error, which was odd since I have a day transform. Digging into the
    code I found that we were comparing without applying the transform.
    ```
    failed to add file to transaction: error encountered during parquet file 
conversion: cannot infer partition value from parquet metadata as there is more 
than one value for partition field: date_partition. (low: 1733183899000000, 
high: 1733183942000000)
    ```
    This PR ensures that we first apply the transform and then do the
    comparison.
    
    Ref:
    https://apache-iceberg.slack.com/archives/C05J3MJ42BD/p1747319038338949
---
 table/internal/utils.go      | 20 ++++++----
 table/internal/utils_test.go | 91 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 103 insertions(+), 8 deletions(-)

diff --git a/table/internal/utils.go b/table/internal/utils.go
index eebe40c..82363e0 100644
--- a/table/internal/utils.go
+++ b/table/internal/utils.go
@@ -216,18 +216,22 @@ func (d *DataFileStatistics) PartitionValue(field 
iceberg.PartitionField, sc *ic
                        field.Name, field.Transform))
        }
 
-       lowerVal, upperVal := agg.Min(), agg.Max()
-       if !lowerVal.Equals(upperVal) {
-               panic(fmt.Errorf("cannot infer partition value from parquet 
metadata as there is more than one value for partition field: %s. (low: %s, 
high: %s)",
-                       field.Name, lowerVal, upperVal))
-       }
+       lowerRec := must(PartitionRecordValue(field, agg.Min(), sc))
+       upperRec := must(PartitionRecordValue(field, agg.Max(), sc))
+
+       lowerT := field.Transform.Apply(lowerRec)
+       upperT := field.Transform.Apply(upperRec)
 
-       val := field.Transform.Apply(must(PartitionRecordValue(field, lowerVal, 
sc)))
-       if !val.Valid {
+       if !lowerT.Valid || !upperT.Valid {
                return nil
        }
 
-       return val.Val.Any()
+       if !lowerT.Val.Equals(upperT.Val) {
+               panic(fmt.Errorf("cannot infer partition value from parquet 
metadata as there is more than one value for partition field: %s. (low: %s, 
high: %s)",
+                       field.Name, lowerT.Val, upperT.Val))
+       }
+
+       return lowerT.Val.Any()
 }
 
 func (d *DataFileStatistics) ToDataFile(schema *iceberg.Schema, spec 
iceberg.PartitionSpec, path string, format iceberg.FileFormat, filesize int64) 
iceberg.DataFile {
diff --git a/table/internal/utils_test.go b/table/internal/utils_test.go
index 60be46b..2466444 100644
--- a/table/internal/utils_test.go
+++ b/table/internal/utils_test.go
@@ -22,6 +22,7 @@ import (
        "testing"
        "time"
 
+       "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/table/internal"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
@@ -96,3 +97,93 @@ func TestMapExecFinish(t *testing.T) {
                time.Second, 10*time.Millisecond,
        )
 }
+
+type mockStatsAgg struct {
+       min, max iceberg.Literal
+}
+
+var _ internal.StatsAgg = (*mockStatsAgg)(nil)
+
+func (m *mockStatsAgg) Min() iceberg.Literal                       { return 
m.min }
+func (m *mockStatsAgg) Max() iceberg.Literal                       { return 
m.max }
+func (m *mockStatsAgg) Update(stats interface{ HasMinMax() bool }) {}
+func (m *mockStatsAgg) MinAsBytes() ([]byte, error)                { return 
nil, nil }
+func (m *mockStatsAgg) MaxAsBytes() ([]byte, error)                { return 
nil, nil }
+
+func TestPartitionValue_LinearTransforms(t *testing.T) {
+       schema := iceberg.NewSchema(1, iceberg.NestedField{
+               ID:   1,
+               Name: "ts_col",
+               Type: iceberg.TimestampType{},
+       })
+
+       cases := []struct {
+               name      string
+               transform iceberg.Transform
+               min       time.Time
+               max       time.Time
+       }{
+               {
+                       name:      "day",
+                       transform: iceberg.DayTransform{},
+                       min:       time.Date(2023, 5, 15, 0, 0, 0, 0, time.UTC),
+                       max:       time.Date(2023, 5, 15, 23, 59, 59, 0, 
time.UTC),
+               },
+               {
+                       name:      "month",
+                       transform: iceberg.MonthTransform{},
+                       min:       time.Date(2023, 5, 1, 0, 0, 0, 0, time.UTC),
+                       max:       time.Date(2023, 5, 31, 23, 59, 59, 0, 
time.UTC),
+               },
+       }
+
+       for _, tc := range cases {
+               t.Run(tc.name, func(t *testing.T) {
+                       minLit := 
iceberg.NewLiteral(iceberg.Timestamp(tc.min.UnixMicro()))
+                       maxLit := 
iceberg.NewLiteral(iceberg.Timestamp(tc.max.UnixMicro()))
+
+                       partitionField := iceberg.PartitionField{
+                               SourceID:  1,
+                               FieldID:   100,
+                               Name:      tc.name + "_part",
+                               Transform: tc.transform,
+                       }
+
+                       stats := internal.DataFileStatistics{
+                               ColAggs: map[int]internal.StatsAgg{
+                                       1: &mockStatsAgg{min: minLit, max: 
maxLit},
+                               },
+                       }
+
+                       minLitOpt := iceberg.Optional[iceberg.Literal]{Val: 
minLit, Valid: true}
+                       expected := tc.transform.Apply(minLitOpt).Val.Any()
+                       got := stats.PartitionValue(partitionField, schema)
+                       assert.Equal(t, expected, got)
+               })
+       }
+}
+
+func TestPartitionValue_MismatchPanics(t *testing.T) {
+       schema := iceberg.NewSchema(1, iceberg.NestedField{
+               ID:   1,
+               Name: "ts_col",
+               Type: iceberg.TimestampType{},
+       })
+       partitionField := iceberg.PartitionField{
+               SourceID:  1,
+               FieldID:   100,
+               Name:      "date_part",
+               Transform: iceberg.DayTransform{},
+       }
+
+       day1 := iceberg.NewLiteral(iceberg.Timestamp(time.Date(2023, 5, 15, 12, 
0, 0, 0, time.UTC).UnixMicro()))
+       day2 := iceberg.NewLiteral(iceberg.Timestamp(time.Date(2023, 5, 16, 12, 
0, 0, 0, time.UTC).UnixMicro()))
+
+       stats := internal.DataFileStatistics{
+               ColAggs: map[int]internal.StatsAgg{
+                       1: &mockStatsAgg{min: day1, max: day2},
+               },
+       }
+
+       assert.Panics(t, func() { stats.PartitionValue(partitionField, schema) 
})
+}

Reply via email to