This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new f71d87c  feat(table): Add files to partitioned tables (#338)
f71d87c is described below

commit f71d87cef43b06cbb4040afb5b73bf17e372b4c8
Author: Matt Topol <[email protected]>
AuthorDate: Tue Mar 18 11:20:34 2025 -0400

    feat(table): Add files to partitioned tables (#338)
    
    Next step in successfully adding files, supporting adding files to
    partitioned tables and properly setting the partition data into the
    DataFile objects.
---
 literals.go                        |  22 ++-
 table/arrow_utils.go               |  79 +++++---
 table/arrow_utils_internal_test.go |  21 +--
 table/internal/parquet_files.go    |   7 +-
 table/scanner.go                   |  23 +++
 table/snapshots.go                 |  32 ++++
 table/table.go                     |  82 +++++++++
 table/table_test.go                | 362 +++++++++++++++++++++++++++++++++++++
 table/transaction.go               |  27 ++-
 transforms.go                      |  10 +-
 10 files changed, 619 insertions(+), 46 deletions(-)

diff --git a/literals.go b/literals.go
index 0671e77..d7b96a9 100644
--- a/literals.go
+++ b/literals.go
@@ -56,6 +56,7 @@ type Literal interface {
        fmt.Stringer
        encoding.BinaryMarshaler
 
+       Any() any
        Type() Type
        To(Type) (Literal, error)
        Equals(Literal) bool
@@ -270,8 +271,8 @@ func (ab aboveMaxLiteral[T]) To(t Type) (Literal, error) {
                ErrBadCast, reflect.TypeOf(T(0)).String())
 }
 
-func (ab aboveMaxLiteral[T]) Value() T { return ab.value }
-
+func (ab aboveMaxLiteral[T]) Value() T       { return ab.value }
+func (ab aboveMaxLiteral[T]) Any() any       { return ab.Value() }
 func (ab aboveMaxLiteral[T]) String() string { return "AboveMax" }
 func (ab aboveMaxLiteral[T]) Equals(other Literal) bool {
        // AboveMaxLiteral isn't comparable and thus isn't even equal to itself
@@ -314,8 +315,8 @@ func (bm belowMinLiteral[T]) To(t Type) (Literal, error) {
                ErrBadCast, reflect.TypeOf(T(0)).String())
 }
 
-func (bm belowMinLiteral[T]) Value() T { return bm.value }
-
+func (bm belowMinLiteral[T]) Value() T       { return bm.value }
+func (bm belowMinLiteral[T]) Any() any       { return bm.Value() }
 func (bm belowMinLiteral[T]) String() string { return "BelowMin" }
 func (bm belowMinLiteral[T]) Equals(other Literal) bool {
        // BelowMinLiteral isn't comparable and thus isn't even equal to itself
@@ -370,6 +371,7 @@ func (BoolLiteral) Comparator() Comparator[bool] {
        }
 }
 
+func (b BoolLiteral) Any() any       { return b.Value() }
 func (b BoolLiteral) Type() Type     { return PrimitiveTypes.Bool }
 func (b BoolLiteral) Value() bool    { return bool(b) }
 func (b BoolLiteral) String() string { return strconv.FormatBool(bool(b)) }
@@ -412,6 +414,7 @@ type Int32Literal int32
 func (Int32Literal) Comparator() Comparator[int32] { return cmp.Compare[int32] 
}
 func (i Int32Literal) Type() Type                  { return 
PrimitiveTypes.Int32 }
 func (i Int32Literal) Value() int32                { return int32(i) }
+func (i Int32Literal) Any() any                    { return i.Value() }
 func (i Int32Literal) String() string              { return 
strconv.FormatInt(int64(i), 10) }
 func (i Int32Literal) To(t Type) (Literal, error) {
        switch t := t.(type) {
@@ -492,6 +495,7 @@ type Int64Literal int64
 func (Int64Literal) Comparator() Comparator[int64] { return cmp.Compare[int64] 
}
 func (i Int64Literal) Type() Type                  { return 
PrimitiveTypes.Int64 }
 func (i Int64Literal) Value() int64                { return int64(i) }
+func (i Int64Literal) Any() any                    { return i.Value() }
 func (i Int64Literal) String() string              { return 
strconv.FormatInt(int64(i), 10) }
 func (i Int64Literal) To(t Type) (Literal, error) {
        switch t := t.(type) {
@@ -577,6 +581,7 @@ type Float32Literal float32
 func (Float32Literal) Comparator() Comparator[float32] { return 
cmp.Compare[float32] }
 func (f Float32Literal) Type() Type                    { return 
PrimitiveTypes.Float32 }
 func (f Float32Literal) Value() float32                { return float32(f) }
+func (f Float32Literal) Any() any                      { return f.Value() }
 func (f Float32Literal) String() string                { return 
strconv.FormatFloat(float64(f), 'g', -1, 32) }
 func (f Float32Literal) To(t Type) (Literal, error) {
        switch t := t.(type) {
@@ -624,6 +629,7 @@ type Float64Literal float64
 func (Float64Literal) Comparator() Comparator[float64] { return 
cmp.Compare[float64] }
 func (f Float64Literal) Type() Type                    { return 
PrimitiveTypes.Float64 }
 func (f Float64Literal) Value() float64                { return float64(f) }
+func (f Float64Literal) Any() any                      { return f.Value() }
 func (f Float64Literal) String() string                { return 
strconv.FormatFloat(float64(f), 'g', -1, 64) }
 func (f Float64Literal) To(t Type) (Literal, error) {
        switch t := t.(type) {
@@ -677,6 +683,7 @@ type DateLiteral Date
 func (DateLiteral) Comparator() Comparator[Date] { return cmp.Compare[Date] }
 func (d DateLiteral) Type() Type                 { return PrimitiveTypes.Date }
 func (d DateLiteral) Value() Date                { return Date(d) }
+func (d DateLiteral) Any() any                   { return d.Value() }
 func (d DateLiteral) String() string {
        t := Date(d).ToTime()
 
@@ -723,6 +730,7 @@ type TimeLiteral Time
 func (TimeLiteral) Comparator() Comparator[Time] { return cmp.Compare[Time] }
 func (t TimeLiteral) Type() Type                 { return PrimitiveTypes.Time }
 func (t TimeLiteral) Value() Time                { return Time(t) }
+func (t TimeLiteral) Any() any                   { return t.Value() }
 func (t TimeLiteral) String() string {
        tm := time.UnixMicro(int64(t)).UTC()
 
@@ -766,6 +774,7 @@ type TimestampLiteral Timestamp
 func (TimestampLiteral) Comparator() Comparator[Timestamp] { return 
cmp.Compare[Timestamp] }
 func (t TimestampLiteral) Type() Type                      { return 
PrimitiveTypes.Timestamp }
 func (t TimestampLiteral) Value() Timestamp                { return 
Timestamp(t) }
+func (t TimestampLiteral) Any() any                        { return t.Value() }
 func (t TimestampLiteral) String() string {
        tm := Timestamp(t).ToTime()
 
@@ -816,6 +825,7 @@ type StringLiteral string
 func (StringLiteral) Comparator() Comparator[string] { return 
cmp.Compare[string] }
 func (s StringLiteral) Type() Type                   { return 
PrimitiveTypes.String }
 func (s StringLiteral) Value() string                { return string(s) }
+func (s StringLiteral) Any() any                     { return s.Value() }
 func (s StringLiteral) String() string               { return string(s) }
 func (s StringLiteral) To(typ Type) (Literal, error) {
        switch t := typ.(type) {
@@ -959,6 +969,7 @@ func (BinaryLiteral) Comparator() Comparator[[]byte] {
 }
 func (b BinaryLiteral) Type() Type     { return PrimitiveTypes.Binary }
 func (b BinaryLiteral) Value() []byte  { return []byte(b) }
+func (b BinaryLiteral) Any() any       { return b.Value() }
 func (b BinaryLiteral) String() string { return string(b) }
 func (b BinaryLiteral) To(typ Type) (Literal, error) {
        switch t := typ.(type) {
@@ -1012,6 +1023,7 @@ type FixedLiteral []byte
 func (FixedLiteral) Comparator() Comparator[[]byte] { return bytes.Compare }
 func (f FixedLiteral) Type() Type                   { return 
FixedTypeOf(len(f)) }
 func (f FixedLiteral) Value() []byte                { return []byte(f) }
+func (f FixedLiteral) Any() any                     { return f.Value() }
 func (f FixedLiteral) String() string               { return string(f) }
 func (f FixedLiteral) To(typ Type) (Literal, error) {
        switch t := typ.(type) {
@@ -1071,6 +1083,7 @@ func (UUIDLiteral) Comparator() Comparator[uuid.UUID] {
 
 func (UUIDLiteral) Type() Type         { return PrimitiveTypes.UUID }
 func (u UUIDLiteral) Value() uuid.UUID { return uuid.UUID(u) }
+func (u UUIDLiteral) Any() any         { return u.Value() }
 func (u UUIDLiteral) String() string   { return uuid.UUID(u).String() }
 func (u UUIDLiteral) To(typ Type) (Literal, error) {
        switch t := typ.(type) {
@@ -1136,6 +1149,7 @@ func (DecimalLiteral) Comparator() Comparator[Decimal] {
 }
 func (d DecimalLiteral) Type() Type     { return DecimalTypeOf(9, d.Scale) }
 func (d DecimalLiteral) Value() Decimal { return Decimal(d) }
+func (d DecimalLiteral) Any() any       { return d.Value() }
 func (d DecimalLiteral) String() string {
        return d.Val.ToString(int32(d.Scale))
 }
diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index f987bb4..a7cdd87 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -759,6 +759,9 @@ func (a *arrowProjectionVisitor) castIfNeeded(field 
iceberg.NestedField, vals ar
 
                        panic(fmt.Errorf("unsupported schema projection from %s 
to %s",
                                vals.DataType(), targetType))
+               default:
+                       return retOrPanic(compute.CastArray(a.ctx, vals,
+                               compute.SafeCastOptions(targetType)))
                }
        }
        vals.Retain()
@@ -1184,6 +1187,8 @@ func (w wrappedDecStats) Max() iceberg.Decimal {
 }
 
 type statsAgg interface {
+       min() iceberg.Literal
+       max() iceberg.Literal
        update(stats metadata.TypedStatistics)
        minAsBytes() ([]byte, error)
        maxAsBytes() ([]byte, error)
@@ -1265,6 +1270,9 @@ func createStatsAgg(typ iceberg.PrimitiveType, 
physicalTypeStr string, truncLen
        }
 }
 
+func (s *statsAggregator[T]) min() iceberg.Literal { return s.curMin }
+func (s *statsAggregator[T]) max() iceberg.Literal { return s.curMax }
+
 func (s *statsAggregator[T]) update(stats metadata.TypedStatistics) {
        st := stats.(typedStat[T])
        s.updateMin(st.Min())
@@ -1631,27 +1639,55 @@ type dataFileStatistics struct {
        splitOffsets    []int64
 }
 
-func (d *dataFileStatistics) toDataFile(spec iceberg.PartitionSpec, path 
string, format iceberg.FileFormat, filesize int64) (iceberg.DataFile, error) {
+func (d *dataFileStatistics) partitionValue(field iceberg.PartitionField, sc 
*iceberg.Schema) any {
+       agg, ok := d.colAggs[field.SourceID]
+       if !ok {
+               return nil
+       }
+
+       if !field.Transform.PreservesOrder() {
+               panic(fmt.Errorf("cannot infer partition value from parquet 
metadata for a non-linear partition field: %s with transform %s",
+                       field.Name, field.Transform))
+       }
+
+       lowerVal, upperVal := agg.min(), agg.max()
+       if !lowerVal.Equals(upperVal) {
+               panic(fmt.Errorf("cannot infer partition value from parquet 
metadata as there is more than one value for partition field: %s. (low: %s, 
high: %s)",
+                       field.Name, lowerVal, upperVal))
+       }
+
+       val := field.Transform.Apply(must(partitionRecordValue(field, lowerVal, 
sc)))
+       if !val.Valid {
+               return nil
+       }
+
+       return val.Val.Any()
+}
+
+func (d *dataFileStatistics) toDataFile(schema *iceberg.Schema, spec 
iceberg.PartitionSpec, path string, format iceberg.FileFormat, filesize int64) 
iceberg.DataFile {
+       var partitionData map[string]any
+       if !spec.Equals(*iceberg.UnpartitionedSpec) {
+               partitionData = make(map[string]any)
+               for field := range spec.Fields() {
+                       val := d.partitionValue(field, schema)
+                       if val != nil {
+                               partitionData[field.Name] = val
+                       }
+               }
+       }
+
        bldr, err := iceberg.NewDataFileBuilder(spec, iceberg.EntryContentData,
-               path, format, nil, d.recordCount, filesize)
+               path, format, partitionData, d.recordCount, filesize)
        if err != nil {
-               return nil, err
+               panic(err)
        }
 
        lowerBounds := make(map[int][]byte)
        upperBounds := make(map[int][]byte)
 
        for fieldID, agg := range d.colAggs {
-               min, err := agg.minAsBytes()
-               if err != nil {
-                       return nil, err
-               }
-
-               max, err := agg.maxAsBytes()
-               if err != nil {
-                       return nil, err
-               }
-
+               min := must(agg.minAsBytes())
+               max := must(agg.maxAsBytes())
                if len(min) > 0 {
                        lowerBounds[fieldID] = min
                }
@@ -1666,13 +1702,14 @@ func (d *dataFileStatistics) toDataFile(spec 
iceberg.PartitionSpec, path string,
        if len(upperBounds) > 0 {
                bldr.UpperBoundValues(upperBounds)
        }
+
        bldr.ColumnSizes(d.colSizes)
        bldr.ValueCounts(d.valueCounts)
        bldr.NullValueCounts(d.nullValueCounts)
        bldr.NaNValueCounts(d.nanValueCounts)
        bldr.SplitOffsets(d.splitOffsets)
 
-       return bldr.Build(), nil
+       return bldr.Build()
 }
 
 func dataFileStatsFromParquetMetadata(pqmeta *metadata.FileMetaData, statsCols 
map[int]statisticsCollector, colMapping map[string]int) *dataFileStatistics {
@@ -1807,6 +1844,8 @@ func parquetFilesToDataFiles(fileIO iceio.IO, meta 
*MetadataBuilder, paths iter.
                        }
                }()
 
+               currentSchema, currentSpec := meta.CurrentSchema(), 
meta.CurrentSpec()
+
                for filePath := range paths {
                        inputFile := must(fileIO.Open(filePath))
                        defer inputFile.Close()
@@ -1824,19 +1863,15 @@ func parquetFilesToDataFiles(fileIO iceio.IO, meta 
*MetadataBuilder, paths iter.
                                return
                        }
 
-                       if err := checkArrowSchemaCompat(meta.CurrentSchema(), 
arrSchema, false); err != nil {
+                       if err := checkArrowSchemaCompat(currentSchema, 
arrSchema, false); err != nil {
                                panic(err)
                        }
 
                        statistics := 
dataFileStatsFromParquetMetadata(rdr.MetaData(),
-                               must(computeStatsPlan(meta.CurrentSchema(), 
meta.props)),
-                               
must(parquetPathToIDMapping(meta.CurrentSchema())))
-
-                       df, err := statistics.toDataFile(meta.CurrentSpec(), 
filePath, iceberg.ParquetFile, rdr.MetaData().GetSourceFileSize())
-                       if err != nil {
-                               panic(err)
-                       }
+                               must(computeStatsPlan(currentSchema, 
meta.props)),
+                               must(parquetPathToIDMapping(currentSchema)))
 
+                       df := statistics.toDataFile(currentSchema, currentSpec, 
filePath, iceberg.ParquetFile, rdr.MetaData().GetSourceFileSize())
                        if !yield(df, nil) {
                                return
                        }
diff --git a/table/arrow_utils_internal_test.go 
b/table/arrow_utils_internal_test.go
index 5584d50..3b478e9 100644
--- a/table/arrow_utils_internal_test.go
+++ b/table/arrow_utils_internal_test.go
@@ -197,11 +197,9 @@ func (suite *FileStatsMetricsSuite) getDataFile(meta 
iceberg.Properties, writeSt
        suite.Require().NoError(err)
 
        stats := dataFileStatsFromParquetMetadata(fileMeta, collector, mapping)
-       df, err := stats.toDataFile(tableMeta.PartitionSpec(), 
"fake-path.parquet",
-               iceberg.ParquetFile, fileMeta.GetSourceFileSize())
-       suite.Require().NoError(err)
 
-       return df
+       return stats.toDataFile(tableMeta.CurrentSchema(), 
tableMeta.PartitionSpec(), "fake-path.parquet",
+               iceberg.ParquetFile, fileMeta.GetSourceFileSize())
 }
 
 func (suite *FileStatsMetricsSuite) TestRecordCount() {
@@ -345,14 +343,14 @@ func (suite *FileStatsMetricsSuite) 
TestColumnMetricsMode() {
 func (suite *FileStatsMetricsSuite) TestReadMissingStats() {
        df := suite.getDataFile(nil, []string{"strings"})
 
-       stringColIdx := 1
-       suite.Len(df.LowerBoundValues(), 1)
-       suite.Equal([]byte("aaaaaaaaaaaaaaaa"), 
df.LowerBoundValues()[stringColIdx])
+       suite.Len(df.NullValueCounts(), 1)
        suite.Len(df.UpperBoundValues(), 1)
-       suite.Equal([]byte("zzzzzzzzzzzzzzz{"), 
df.UpperBoundValues()[stringColIdx])
+       suite.Len(df.LowerBoundValues(), 1)
 
-       suite.Len(df.NullValueCounts(), 1)
-       suite.EqualValues(1, df.NullValueCounts()[stringColIdx])
+       stringsColIdx := 1
+       suite.Equal("aaaaaaaaaaaaaaaa", 
string(df.LowerBoundValues()[stringsColIdx]))
+       suite.Equal("zzzzzzzzzzzzzzz{", 
string(df.UpperBoundValues()[stringsColIdx]))
+       suite.EqualValues(1, df.NullValueCounts()[stringsColIdx])
 }
 
 func TestFileMetrics(t *testing.T) {
@@ -599,9 +597,8 @@ func TestMetricsPrimitiveTypes(t *testing.T) {
        require.NoError(t, err)
 
        stats := dataFileStatsFromParquetMetadata(meta, collector, mapping)
-       df, err := stats.toDataFile(tblMeta.PartitionSpec(), 
"fake-path.parquet",
+       df := stats.toDataFile(tblMeta.CurrentSchema(), 
tblMeta.PartitionSpec(), "fake-path.parquet",
                iceberg.ParquetFile, meta.GetSourceFileSize())
-       require.NoError(t, err)
 
        assert.Len(t, df.ValueCounts(), 15)
        assert.Len(t, df.NullValueCounts(), 15)
diff --git a/table/internal/parquet_files.go b/table/internal/parquet_files.go
index d1b5059..770a9be 100644
--- a/table/internal/parquet_files.go
+++ b/table/internal/parquet_files.go
@@ -339,7 +339,12 @@ func (p *pruneParquetSchema) Field(field 
pqarrow.SchemaField, result arrow.Field
 }
 
 func (p *pruneParquetSchema) List(field pqarrow.SchemaField, elemResult 
arrow.Field, mapping *iceberg.MappedField) arrow.Field {
-       _, ok := p.selected[p.fieldID(*field.Children[0].Field, mapping)]
+       var elemMapping *iceberg.MappedField
+       if mapping != nil {
+               elemMapping = mapping.GetField("element")
+       }
+
+       _, ok := p.selected[p.fieldID(*field.Children[0].Field, elemMapping)]
        if !ok {
                if elemResult.Type != nil {
                        result := *field.Field
diff --git a/table/scanner.go b/table/scanner.go
index c0eaf93..a058d90 100644
--- a/table/scanner.go
+++ b/table/scanner.go
@@ -488,3 +488,26 @@ func (scan *Scan) ToArrowTable(ctx context.Context) 
(arrow.Table, error) {
 
        return array.NewTableFromRecords(schema, records), nil
 }
+
+func partitionRecordValue(field iceberg.PartitionField, val iceberg.Literal, 
schema *iceberg.Schema) (iceberg.Optional[iceberg.Literal], error) {
+       var ret iceberg.Optional[iceberg.Literal]
+       if val == nil {
+               return ret, nil
+       }
+
+       f, ok := schema.FindFieldByID(field.SourceID)
+       if !ok {
+               return ret, fmt.Errorf("%w: could not find source field in 
schema for %s",
+                       iceberg.ErrInvalidSchema, field.String())
+       }
+
+       out, err := val.To(f.Type)
+       if err != nil {
+               return ret, err
+       }
+
+       ret.Val = out
+       ret.Valid = true
+
+       return ret, nil
+}
diff --git a/table/snapshots.go b/table/snapshots.go
index b12c74a..45b21ad 100644
--- a/table/snapshots.go
+++ b/table/snapshots.go
@@ -21,6 +21,7 @@ import (
        "encoding/json"
        "errors"
        "fmt"
+       "iter"
        "maps"
        "slices"
        "strconv"
@@ -306,6 +307,37 @@ func (s Snapshot) Manifests(fio iceio.IO) 
([]iceberg.ManifestFile, error) {
        return nil, nil
 }
 
+func (s Snapshot) dataFiles(fio iceio.IO, fileFilter 
set[iceberg.ManifestEntryContent]) iter.Seq2[iceberg.DataFile, error] {
+       return func(yield func(iceberg.DataFile, error) bool) {
+               manifests, err := s.Manifests(fio)
+               if err != nil {
+                       yield(nil, err)
+
+                       return
+               }
+
+               for _, m := range manifests {
+                       dataFiles, err := m.FetchEntries(fio, false)
+                       if err != nil {
+                               yield(nil, err)
+
+                               return
+                       }
+
+                       for _, f := range dataFiles {
+                               if fileFilter != nil {
+                                       if _, ok := 
fileFilter[f.DataFile().ContentType()]; !ok {
+                                               continue
+                                       }
+                               }
+                               if !yield(f.DataFile(), nil) {
+                                       return
+                               }
+                       }
+               }
+       }
+}
+
 type MetadataLogEntry struct {
        MetadataFile string `json:"metadata-file"`
        TimestampMs  int64  `json:"timestamp-ms"`
diff --git a/table/table.go b/table/table.go
index 2584b72..db679e6 100644
--- a/table/table.go
+++ b/table/table.go
@@ -27,6 +27,8 @@ import (
        "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/internal"
        "github.com/apache/iceberg-go/io"
+       tblutils "github.com/apache/iceberg-go/table/internal"
+       "golang.org/x/sync/errgroup"
 )
 
 type Identifier = []string
@@ -86,6 +88,86 @@ func (t Table) NewTransaction() *Transaction {
        }
 }
 
+func (t Table) AllManifests() iter.Seq2[iceberg.ManifestFile, error] {
+       type list = tblutils.Enumerated[[]iceberg.ManifestFile]
+       g := errgroup.Group{}
+
+       n := len(t.metadata.Snapshots())
+       ch := make(chan list, n)
+       for i, sn := range t.metadata.Snapshots() {
+               g.Go(func() error {
+                       manifests, err := sn.Manifests(t.fs)
+                       if err != nil {
+                               return err
+                       }
+
+                       ch <- list{Index: i, Value: manifests, Last: i == n-1}
+
+                       return nil
+               })
+       }
+
+       errch := make(chan error, 1)
+       go func() {
+               defer close(errch)
+               defer close(ch)
+               if err := g.Wait(); err != nil {
+                       errch <- err
+               }
+       }()
+
+       results := tblutils.MakeSequencedChan(uint(n), ch,
+               func(left, right *list) bool {
+                       switch {
+                       case left.Index < 0:
+                               return true
+                       case right.Index < 0:
+                               return false
+                       default:
+                               return left.Index < right.Index
+                       }
+               }, func(prev, next *list) bool {
+                       if prev.Index < 0 {
+                               return next.Index == 0
+                       }
+
+                       return next.Index == prev.Index+1
+               }, list{Index: -1})
+
+       return func(yield func(iceberg.ManifestFile, error) bool) {
+               defer func() {
+                       // drain channels if we exited early
+                       go func() {
+                               for range results {
+                               }
+                               for range errch {
+                               }
+                       }()
+               }()
+
+               for {
+                       select {
+                       case err := <-errch:
+                               if err != nil {
+                                       yield(nil, err)
+
+                                       return
+                               }
+                       case next, ok := <-results:
+                               for _, mf := range next.Value {
+                                       if !yield(mf, nil) {
+                                               return
+                                       }
+                               }
+
+                               if next.Last || !ok {
+                                       return
+                               }
+                       }
+               }
+       }
+}
+
 func (t Table) doCommit(ctx context.Context, updates []Update, reqs 
[]Requirement) (*Table, error) {
        newMeta, newLoc, err := t.cat.CommitTable(ctx, &t, reqs, updates)
        if err != nil {
diff --git a/table/table_test.go b/table/table_test.go
index 8ae66d7..044fa01 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -150,6 +150,10 @@ type TableWritingTestSuite struct {
        arrSchemaUpdated *arrow.Schema
        arrTblUpdated    arrow.Table
 
+       tableSchemaPromotedTypes *iceberg.Schema
+       arrSchemaPromotedTypes   *arrow.Schema
+       arrTablePromotedTypes    arrow.Table
+
        location      string
        formatVersion int
 }
@@ -211,6 +215,44 @@ func (t *TableWritingTestSuite) SetupSuite() {
                `[{"foo": true, "baz": 123, "qux": "2024-03-07", "quux": 234}]`,
        })
        t.Require().NoError(err)
+
+       t.tableSchemaPromotedTypes = iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "long", Type: 
iceberg.PrimitiveTypes.Int64},
+               iceberg.NestedField{
+                       ID: 2, Name: "list",
+                       Type:     &iceberg.ListType{ElementID: 4, Element: 
iceberg.PrimitiveTypes.Int64},
+                       Required: true,
+               },
+               iceberg.NestedField{
+                       ID: 3, Name: "map",
+                       Type: &iceberg.MapType{
+                               KeyID:     5,
+                               KeyType:   iceberg.PrimitiveTypes.String,
+                               ValueID:   6,
+                               ValueType: iceberg.PrimitiveTypes.Int64,
+                       },
+                       Required: true,
+               },
+               iceberg.NestedField{ID: 7, Name: "double", Type: 
iceberg.PrimitiveTypes.Float64})
+       // arrow-go needs to implement cast_extension for [16]byte -> uuid
+       // iceberg.NestedField{ID: 8, Name: "uuid", Type: 
iceberg.PrimitiveTypes.UUID})
+
+       t.arrSchemaPromotedTypes = arrow.NewSchema([]arrow.Field{
+               {Name: "long", Type: arrow.PrimitiveTypes.Int32, Nullable: 
true},
+               {Name: "list", Type: arrow.ListOf(arrow.PrimitiveTypes.Int32), 
Nullable: false},
+               {Name: "map", Type: arrow.MapOf(arrow.BinaryTypes.String, 
arrow.PrimitiveTypes.Int32), Nullable: false},
+               {Name: "double", Type: arrow.PrimitiveTypes.Float32, Nullable: 
true},
+       }, nil)
+       // arrow-go needs to implement cast_extension for [16]byte -> uuid
+       // {Name: "uuid", Type: &arrow.FixedSizeBinaryType{ByteWidth: 16}, 
Nullable: true}}, nil)
+
+       t.arrTablePromotedTypes, err = array.TableFromJSON(mem, 
t.arrSchemaPromotedTypes, []string{
+               `[
+                       {"long": 1, "list": [1, 1], "map": [{"key": "a", 
"value": 1}], "double": 1.1, "uuid": "cVp4705TQImb+TrQ7pv1RQ=="},
+                       {"long": 9, "list": [2, 2], "map": [{"key": "b", 
"value": 2}], "double": 9.2, "uuid": "l12HVF5KREqWl/R25AMM3g=="}
+               ]`,
+       })
+       t.Require().NoError(err)
 }
 
 func (t *TableWritingTestSuite) SetupTest() {
@@ -367,6 +409,326 @@ func (t *TableWritingTestSuite) 
TestAddFilesFailsSchemaMismatch() {
 `)
 }
 
+func (t *TableWritingTestSuite) TestAddFilesPartitionedTable() {
+       ident := table.Identifier{"default", "partitioned_table_v" + 
strconv.Itoa(t.formatVersion)}
+       spec := iceberg.NewPartitionSpec(
+               iceberg.PartitionField{SourceID: 4, FieldID: 1000, Transform: 
iceberg.IdentityTransform{}, Name: "baz"},
+               iceberg.PartitionField{SourceID: 10, FieldID: 1001, Transform: 
iceberg.MonthTransform{}, Name: "qux_month"})
+
+       tbl := t.createTable(ident, t.formatVersion,
+               spec, t.tableSchema)
+
+       t.NotNil(tbl)
+
+       dates := []string{
+               "2024-03-07", "2024-03-08", "2024-03-16", "2024-03-18", 
"2024-03-19",
+       }
+
+       files := make([]string, 0)
+       for i := range 5 {
+               filePath := fmt.Sprintf("%s/partitioned_table/test-%d.parquet", 
t.location, i)
+               table, err := array.TableFromJSON(memory.DefaultAllocator, 
t.arrSchema, []string{
+                       `[{"foo": true, "bar": "bar_string", "baz": 123, "qux": 
"` + dates[i] + `"}]`,
+               })
+               t.Require().NoError(err)
+               defer table.Release()
+
+               t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath, table)
+               files = append(files, filePath)
+       }
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddFiles(files, nil, false))
+
+       stagedTbl, err := tx.StagedTable()
+       t.Require().NoError(err)
+       t.NotNil(stagedTbl.NameMapping())
+
+       t.Equal(stagedTbl.CurrentSnapshot().Summary,
+               &table.Summary{
+                       Operation: table.OpAppend,
+                       Properties: iceberg.Properties{
+                               "added-data-files":        "5",
+                               "added-files-size":        "3660",
+                               "added-records":           "5",
+                               "changed-partition-count": "1",
+                               "total-data-files":        "5",
+                               "total-delete-files":      "0",
+                               "total-equality-deletes":  "0",
+                               "total-files-size":        "3660",
+                               "total-position-deletes":  "0",
+                               "total-records":           "5",
+                       },
+               })
+
+       m, err := stagedTbl.CurrentSnapshot().Manifests(tbl.FS())
+       t.Require().NoError(err)
+
+       for _, manifest := range m {
+               entries, err := manifest.FetchEntries(tbl.FS(), false)
+               t.Require().NoError(err)
+
+               for _, e := range entries {
+                       t.Equal(map[string]any{
+                               "baz": 123, "qux_month": 650,
+                       }, e.DataFile().Partition())
+               }
+       }
+}
+
+func (t *TableWritingTestSuite) TestAddFilesToBucketPartitionedTableFails() {
+       ident := table.Identifier{"default", "partitioned_table_bucket_fails_v" 
+ strconv.Itoa(t.formatVersion)}
+       spec := iceberg.NewPartitionSpec(
+               iceberg.PartitionField{SourceID: 4, FieldID: 1000, Transform: 
iceberg.BucketTransform{NumBuckets: 3}, Name: "baz_bucket_3"})
+
+       tbl := t.createTable(ident, t.formatVersion, spec, t.tableSchema)
+       files := make([]string, 0)
+       for i := range 5 {
+               filePath := fmt.Sprintf("%s/partitioned_table/test-%d.parquet", 
t.location, i)
+               table, err := array.TableFromJSON(memory.DefaultAllocator, 
t.arrSchema, []string{
+                       `[{"foo": true, "bar": "bar_string", "baz": ` + 
strconv.Itoa(i) + `, "qux": "2024-03-07"}]`,
+               })
+               t.Require().NoError(err)
+               defer table.Release()
+
+               t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath, table)
+               files = append(files, filePath)
+       }
+
+       tx := tbl.NewTransaction()
+       err := tx.AddFiles(files, nil, false)
+       t.Error(err)
+       t.ErrorContains(err, "cannot infer partition value from parquet 
metadata for a non-linear partition field: baz_bucket_3 with transform 
bucket[3]")
+}
+
+func (t *TableWritingTestSuite) 
TestAddFilesToPartitionedTableFailsLowerAndUpperMismatch() {
+       ident := table.Identifier{"default", "partitioned_table_bucket_fails_v" 
+ strconv.Itoa(t.formatVersion)}
+       spec := iceberg.NewPartitionSpec(
+               iceberg.PartitionField{SourceID: 4, FieldID: 1000, Transform: 
iceberg.IdentityTransform{}, Name: "baz"})
+
+       tbl := t.createTable(ident, t.formatVersion, spec, t.tableSchema)
+       files := make([]string, 0)
+       for i := range 5 {
+               filePath := fmt.Sprintf("%s/partitioned_table/test-%d.parquet", 
t.location, i)
+               table, err := array.TableFromJSON(memory.DefaultAllocator, 
t.arrSchema, []string{
+                       `[
+                               {"foo": true, "bar": "bar_string", "baz": 123, 
"qux": "2024-03-07"},
+                               {"foo": true, "bar": "bar_string", "baz": 124, 
"qux": "2024-03-07"}
+                       ]`,
+               })
+               t.Require().NoError(err)
+               defer table.Release()
+
+               t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath, table)
+               files = append(files, filePath)
+       }
+
+       tx := tbl.NewTransaction()
+       err := tx.AddFiles(files, nil, false)
+       t.Error(err)
+       t.ErrorContains(err, "cannot infer partition value from parquet 
metadata as there is more than one value for partition field: baz. (low: 123, 
high: 124)")
+}
+
+func (t *TableWritingTestSuite) TestAddFilesWithLargeAndRegular() {
+       ident := table.Identifier{"default", "unpartitioned_with_large_types_v" 
+ strconv.Itoa(t.formatVersion)}
+       ice := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "foo", Type: 
iceberg.PrimitiveTypes.String, Required: true})
+
+       arrowSchema := arrow.NewSchema([]arrow.Field{
+               {Name: "foo", Type: arrow.BinaryTypes.String},
+       }, nil)
+       arrowSchemaLarge := arrow.NewSchema([]arrow.Field{
+               {Name: "foo", Type: arrow.BinaryTypes.LargeString},
+       }, nil)
+
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, ice)
+       t.Require().NotNil(tbl)
+
+       filePath := 
fmt.Sprintf("%s/unpartitioned_with_large_types/v%d/test-0.parquet", t.location, 
t.formatVersion)
+       filePathLarge := 
fmt.Sprintf("%s/unpartitioned_with_large_types/v%d/test-1.parquet", t.location, 
t.formatVersion)
+
+       arrTable, err := array.TableFromJSON(memory.DefaultAllocator, 
arrowSchema, []string{
+               `[{"foo": "bar"}]`,
+       })
+       t.Require().NoError(err)
+       defer arrTable.Release()
+
+       arrTableLarge, err := array.TableFromJSON(memory.DefaultAllocator, 
arrowSchemaLarge,
+               []string{`[{"foo": "bar"}]`})
+       t.Require().NoError(err)
+       defer arrTableLarge.Release()
+
+       t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath, arrTable)
+       t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePathLarge, 
arrTableLarge)
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddFiles([]string{filePath, filePathLarge}, nil, 
false))
+
+       stagedTbl, err := tx.StagedTable()
+       t.Require().NoError(err)
+
+       result, err := stagedTbl.Scan(table.WithOptions(iceberg.Properties{
+               table.ScanOptionArrowUseLargeTypes: "true",
+       })).ToArrowTable(context.Background())
+       t.Require().NoError(err)
+       defer result.Release()
+
+       t.EqualValues(2, result.NumRows())
+       t.Truef(arrowSchemaLarge.Equal(result.Schema()), "expected schema: %s, 
got: %s", arrowSchemaLarge, result.Schema())
+}
+
+func (t *TableWritingTestSuite) TestAddFilesValidUpcast() {
+       ident := table.Identifier{"default", "test_table_with_valid_upcast_v" + 
strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchemaPromotedTypes)
+
+       filePath := 
fmt.Sprintf("%s/test_table_with_valid_upcast_v%d/test.parquet", t.location, 
t.formatVersion)
+       t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath, 
t.arrTablePromotedTypes)
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddFiles([]string{filePath}, nil, false))
+
+       staged, err := tx.StagedTable()
+       t.Require().NoError(err)
+
+       written, err := staged.Scan().ToArrowTable(context.Background())
+       t.Require().NoError(err)
+       defer written.Release()
+
+       t.EqualValues(2, written.NumRows())
+       expectedSchema := arrow.NewSchema([]arrow.Field{
+               {Name: "long", Type: arrow.PrimitiveTypes.Int64, Nullable: 
true},
+               {Name: "list", Type: arrow.ListOf(arrow.PrimitiveTypes.Int64), 
Nullable: false},
+               {Name: "map", Type: arrow.MapOf(arrow.BinaryTypes.String, 
arrow.PrimitiveTypes.Int64), Nullable: false},
+               {Name: "double", Type: arrow.PrimitiveTypes.Float64, Nullable: 
true},
+       }, nil)
+       t.True(expectedSchema.Equal(written.Schema()))
+}
+
+func dropColFromTable(idx int, tbl arrow.Table) arrow.Table {
+       if idx < 0 || idx >= int(tbl.NumCols()) {
+               panic("invalid column to drop")
+       }
+
+       fields := tbl.Schema().Fields()
+       fields = append(fields[:idx], fields[idx+1:]...)
+
+       cols := make([]arrow.Column, 0, tbl.NumCols()-1)
+       for i := 0; i < int(tbl.NumCols()); i++ {
+               if i == idx {
+                       continue
+               }
+               cols = append(cols, *tbl.Column(i))
+       }
+
+       return array.NewTable(arrow.NewSchema(fields, nil), cols, tbl.NumRows())
+}
+
+func (t *TableWritingTestSuite) TestAddFilesSubsetOfSchema() {
+       ident := table.Identifier{"default", 
"test_table_with_subset_of_schema_v" + strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       filePath := 
fmt.Sprintf("%s/test_table_with_subset_of_schema_v%d/test.parquet", t.location, 
t.formatVersion)
+       withoutCol := dropColFromTable(0, t.arrTbl)
+       defer withoutCol.Release()
+       t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath, withoutCol)
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddFiles([]string{filePath}, nil, false))
+
+       staged, err := tx.StagedTable()
+       t.Require().NoError(err)
+
+       written, err := staged.Scan().ToArrowTable(context.Background())
+       t.Require().NoError(err)
+       defer written.Release()
+
+       t.EqualValues(1, written.NumRows())
+       expectedSchema := arrow.NewSchema([]arrow.Field{
+               {Name: "foo", Type: arrow.FixedWidthTypes.Boolean, Nullable: 
true},
+               {Name: "bar", Type: arrow.BinaryTypes.String, Nullable: true},
+               {Name: "baz", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
+               {Name: "qux", Type: arrow.PrimitiveTypes.Date32, Nullable: 
true},
+       }, nil)
+       t.True(expectedSchema.Equal(written.Schema()), expectedSchema.String(), 
written.Schema().String())
+
+       result, err := array.TableFromJSON(memory.DefaultAllocator, 
t.arrSchema, []string{
+               `[{"foo": null, "bar": "bar_string", "baz": 123, "qux": 
"2024-03-07"}]`,
+       })
+       t.Require().NoError(err)
+       defer result.Release()
+
+       t.True(array.TableEqual(result, written))
+}
+
+func (t *TableWritingTestSuite) TestAddFilesDuplicateFilesInFilePaths() {
+       ident := table.Identifier{"default", 
"test_table_with_duplicate_files_v" + strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion, 
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+       filePath := 
fmt.Sprintf("%s/test_table_with_duplicate_files_v%d/test.parquet", t.location, 
t.formatVersion)
+       t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath, t.arrTbl)
+
+       tx := tbl.NewTransaction()
+       err := tx.AddFiles([]string{filePath, filePath}, nil, false)
+       t.Error(err)
+       t.ErrorContains(err, "file paths must be unique for AddFiles")
+}
+
+func (t *TableWritingTestSuite) TestAddFilesReferencedByCurrentSnapshot() {
+       ident := table.Identifier{"default", "add_files_referenced_v" + 
strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion,
+               *iceberg.UnpartitionedSpec, t.tableSchema)
+
+       t.NotNil(tbl)
+
+       files := make([]string, 0)
+       for i := range 5 {
+               filePath := 
fmt.Sprintf("%s/add_files_referenced/test-%d.parquet", t.location, i)
+               t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath, t.arrTbl)
+               files = append(files, filePath)
+       }
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddFiles(files, nil, false))
+
+       err := tx.AddFiles(files[len(files)-1:], nil, false)
+       t.Error(err)
+       t.ErrorContains(err, "cannot add files that are already referenced by 
table, files:")
+}
+
+func (t *TableWritingTestSuite) 
TestAddFilesReferencedCurrentSnapshotIgnoreDuplicates() {
+       ident := table.Identifier{"default", "add_files_referenced_v" + 
strconv.Itoa(t.formatVersion)}
+       tbl := t.createTable(ident, t.formatVersion,
+               *iceberg.UnpartitionedSpec, t.tableSchema)
+
+       t.NotNil(tbl)
+
+       files := make([]string, 0)
+       for i := range 5 {
+               filePath := 
fmt.Sprintf("%s/add_files_referenced/test-%d.parquet", t.location, i)
+               t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath, t.arrTbl)
+               files = append(files, filePath)
+       }
+
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.AddFiles(files, nil, false))
+
+       t.Require().NoError(tx.AddFiles(files[len(files)-1:], nil, true))
+       staged, err := tx.StagedTable()
+       t.Require().NoError(err)
+
+       added, existing, deleted := []int32{}, []int32{}, []int32{}
+       for m, err := range staged.AllManifests() {
+               t.Require().NoError(err)
+               added = append(added, m.AddedDataFiles())
+               existing = append(existing, m.ExistingDataFiles())
+               deleted = append(deleted, m.DeletedDataFiles())
+       }
+
+       t.Equal([]int32{5, 1, 5}, added)
+       t.Equal([]int32{0, 0, 0}, existing)
+       t.Equal([]int32{0, 0, 0}, deleted)
+}
+
 func TestTableWriting(t *testing.T) {
        suite.Run(t, &TableWritingTestSuite{formatVersion: 1})
        suite.Run(t, &TableWritingTestSuite{formatVersion: 2})
diff --git a/table/transaction.go b/table/transaction.go
index 15785b5..bdbc62b 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -21,6 +21,7 @@ import (
        "context"
        "encoding/json"
        "errors"
+       "fmt"
        "slices"
        "sync"
        "time"
@@ -119,18 +120,32 @@ func (t *Transaction) Append(rdr array.RecordReader, 
snapshotProps iceberg.Prope
        return iceberg.ErrNotImplemented
 }
 
-func (t *Transaction) AddFiles(files []string, snapshotProps 
iceberg.Properties, checkDuplicates bool) error {
+func (t *Transaction) AddFiles(files []string, snapshotProps 
iceberg.Properties, ignoreDuplicates bool) error {
        set := make(map[string]string)
        for _, f := range files {
                set[f] = f
        }
 
        if len(set) != len(files) {
-               return errors.New("file paths must be unique for 
AppendDataFiles")
-       }
-
-       if checkDuplicates {
-               return iceberg.ErrNotImplemented
+               return errors.New("file paths must be unique for AddFiles")
+       }
+
+       if !ignoreDuplicates {
+               if s := t.meta.currentSnapshot(); s != nil {
+                       referenced := make([]string, 0)
+                       for df, err := range s.dataFiles(t.tbl.fs, nil) {
+                               if err != nil {
+                                       return err
+                               }
+
+                               if _, ok := set[df.FilePath()]; ok {
+                                       referenced = append(referenced, 
df.FilePath())
+                               }
+                       }
+                       if len(referenced) > 0 {
+                               return fmt.Errorf("cannot add files that are 
already referenced by table, files: %s", referenced)
+                       }
+               }
        }
 
        if t.meta.NameMapping() == nil {
diff --git a/transforms.go b/transforms.go
index 3066ea7..fb37e6c 100644
--- a/transforms.go
+++ b/transforms.go
@@ -86,6 +86,7 @@ type Transform interface {
        fmt.Stringer
        encoding.TextMarshaler
        ResultType(t Type) Type
+       PreservesOrder() bool
        Equals(Transform) bool
        Apply(Optional[Literal]) Optional[Literal]
        Project(name string, pred BoundPredicate) (UnboundPredicate, error)
@@ -104,6 +105,7 @@ func (t IdentityTransform) MarshalText() ([]byte, error) {
 func (IdentityTransform) String() string { return "identity" }
 
 func (IdentityTransform) ResultType(t Type) Type { return t }
+func (IdentityTransform) PreservesOrder() bool   { return true }
 
 func (IdentityTransform) Equals(other Transform) bool {
        _, ok := other.(IdentityTransform)
@@ -161,6 +163,7 @@ func (t VoidTransform) MarshalText() ([]byte, error) {
 func (VoidTransform) String() string { return "void" }
 
 func (VoidTransform) ResultType(t Type) Type { return t }
+func (VoidTransform) PreservesOrder() bool   { return false }
 
 func (VoidTransform) Equals(other Transform) bool {
        _, ok := other.(VoidTransform)
@@ -193,6 +196,7 @@ func (t BucketTransform) MarshalText() ([]byte, error) {
 func (t BucketTransform) String() string { return fmt.Sprintf("bucket[%d]", 
t.NumBuckets) }
 
 func (BucketTransform) ResultType(Type) Type { return PrimitiveTypes.Int32 }
+func (BucketTransform) PreservesOrder() bool { return false }
 
 func hashHelperInt[T ~int32 | ~int64](v any) uint32 {
        var (
@@ -353,7 +357,7 @@ func (t TruncateTransform) MarshalText() ([]byte, error) {
 func (t TruncateTransform) String() string { return 
fmt.Sprintf("truncate[%d]", t.Width) }
 
 func (TruncateTransform) ResultType(t Type) Type { return t }
-
+func (TruncateTransform) PreservesOrder() bool   { return true }
 func (t TruncateTransform) Equals(other Transform) bool {
        rhs, ok := other.(TruncateTransform)
        if !ok {
@@ -550,6 +554,7 @@ func (t YearTransform) MarshalText() ([]byte, error) {
 func (YearTransform) String() string { return "year" }
 
 func (YearTransform) ResultType(Type) Type { return PrimitiveTypes.Int32 }
+func (YearTransform) PreservesOrder() bool { return true }
 
 func (YearTransform) Equals(other Transform) bool {
        _, ok := other.(YearTransform)
@@ -627,6 +632,7 @@ func (t MonthTransform) MarshalText() ([]byte, error) {
 func (MonthTransform) String() string { return "month" }
 
 func (MonthTransform) ResultType(Type) Type { return PrimitiveTypes.Int32 }
+func (MonthTransform) PreservesOrder() bool { return true }
 
 func (MonthTransform) Equals(other Transform) bool {
        _, ok := other.(MonthTransform)
@@ -715,6 +721,7 @@ func (t DayTransform) MarshalText() ([]byte, error) {
 func (DayTransform) String() string { return "day" }
 
 func (DayTransform) ResultType(Type) Type { return PrimitiveTypes.Date }
+func (DayTransform) PreservesOrder() bool { return true }
 
 func (DayTransform) Equals(other Transform) bool {
        _, ok := other.(DayTransform)
@@ -792,6 +799,7 @@ func (t HourTransform) MarshalText() ([]byte, error) {
 func (HourTransform) String() string { return "hour" }
 
 func (HourTransform) ResultType(Type) Type { return PrimitiveTypes.Int32 }
+func (HourTransform) PreservesOrder() bool { return true }
 
 func (HourTransform) Equals(other Transform) bool {
        _, ok := other.(HourTransform)


Reply via email to