This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new f71d87c feat(table): Add files to partitioned tables (#338)
f71d87c is described below
commit f71d87cef43b06cbb4040afb5b73bf17e372b4c8
Author: Matt Topol <[email protected]>
AuthorDate: Tue Mar 18 11:20:34 2025 -0400
feat(table): Add files to partitioned tables (#338)
Next step in successfully adding files, supporting adding files to
partitioned tables and properly setting the partition data into the
DataFile objects.
---
literals.go | 22 ++-
table/arrow_utils.go | 79 +++++---
table/arrow_utils_internal_test.go | 21 +--
table/internal/parquet_files.go | 7 +-
table/scanner.go | 23 +++
table/snapshots.go | 32 ++++
table/table.go | 82 +++++++++
table/table_test.go | 362 +++++++++++++++++++++++++++++++++++++
table/transaction.go | 27 ++-
transforms.go | 10 +-
10 files changed, 619 insertions(+), 46 deletions(-)
diff --git a/literals.go b/literals.go
index 0671e77..d7b96a9 100644
--- a/literals.go
+++ b/literals.go
@@ -56,6 +56,7 @@ type Literal interface {
fmt.Stringer
encoding.BinaryMarshaler
+ Any() any
Type() Type
To(Type) (Literal, error)
Equals(Literal) bool
@@ -270,8 +271,8 @@ func (ab aboveMaxLiteral[T]) To(t Type) (Literal, error) {
ErrBadCast, reflect.TypeOf(T(0)).String())
}
-func (ab aboveMaxLiteral[T]) Value() T { return ab.value }
-
+func (ab aboveMaxLiteral[T]) Value() T { return ab.value }
+func (ab aboveMaxLiteral[T]) Any() any { return ab.Value() }
func (ab aboveMaxLiteral[T]) String() string { return "AboveMax" }
func (ab aboveMaxLiteral[T]) Equals(other Literal) bool {
// AboveMaxLiteral isn't comparable and thus isn't even equal to itself
@@ -314,8 +315,8 @@ func (bm belowMinLiteral[T]) To(t Type) (Literal, error) {
ErrBadCast, reflect.TypeOf(T(0)).String())
}
-func (bm belowMinLiteral[T]) Value() T { return bm.value }
-
+func (bm belowMinLiteral[T]) Value() T { return bm.value }
+func (bm belowMinLiteral[T]) Any() any { return bm.Value() }
func (bm belowMinLiteral[T]) String() string { return "BelowMin" }
func (bm belowMinLiteral[T]) Equals(other Literal) bool {
// BelowMinLiteral isn't comparable and thus isn't even equal to itself
@@ -370,6 +371,7 @@ func (BoolLiteral) Comparator() Comparator[bool] {
}
}
+func (b BoolLiteral) Any() any { return b.Value() }
func (b BoolLiteral) Type() Type { return PrimitiveTypes.Bool }
func (b BoolLiteral) Value() bool { return bool(b) }
func (b BoolLiteral) String() string { return strconv.FormatBool(bool(b)) }
@@ -412,6 +414,7 @@ type Int32Literal int32
func (Int32Literal) Comparator() Comparator[int32] { return cmp.Compare[int32]
}
func (i Int32Literal) Type() Type { return
PrimitiveTypes.Int32 }
func (i Int32Literal) Value() int32 { return int32(i) }
+func (i Int32Literal) Any() any { return i.Value() }
func (i Int32Literal) String() string { return
strconv.FormatInt(int64(i), 10) }
func (i Int32Literal) To(t Type) (Literal, error) {
switch t := t.(type) {
@@ -492,6 +495,7 @@ type Int64Literal int64
func (Int64Literal) Comparator() Comparator[int64] { return cmp.Compare[int64]
}
func (i Int64Literal) Type() Type { return
PrimitiveTypes.Int64 }
func (i Int64Literal) Value() int64 { return int64(i) }
+func (i Int64Literal) Any() any { return i.Value() }
func (i Int64Literal) String() string { return
strconv.FormatInt(int64(i), 10) }
func (i Int64Literal) To(t Type) (Literal, error) {
switch t := t.(type) {
@@ -577,6 +581,7 @@ type Float32Literal float32
func (Float32Literal) Comparator() Comparator[float32] { return
cmp.Compare[float32] }
func (f Float32Literal) Type() Type { return
PrimitiveTypes.Float32 }
func (f Float32Literal) Value() float32 { return float32(f) }
+func (f Float32Literal) Any() any { return f.Value() }
func (f Float32Literal) String() string { return
strconv.FormatFloat(float64(f), 'g', -1, 32) }
func (f Float32Literal) To(t Type) (Literal, error) {
switch t := t.(type) {
@@ -624,6 +629,7 @@ type Float64Literal float64
func (Float64Literal) Comparator() Comparator[float64] { return
cmp.Compare[float64] }
func (f Float64Literal) Type() Type { return
PrimitiveTypes.Float64 }
func (f Float64Literal) Value() float64 { return float64(f) }
+func (f Float64Literal) Any() any { return f.Value() }
func (f Float64Literal) String() string { return
strconv.FormatFloat(float64(f), 'g', -1, 64) }
func (f Float64Literal) To(t Type) (Literal, error) {
switch t := t.(type) {
@@ -677,6 +683,7 @@ type DateLiteral Date
func (DateLiteral) Comparator() Comparator[Date] { return cmp.Compare[Date] }
func (d DateLiteral) Type() Type { return PrimitiveTypes.Date }
func (d DateLiteral) Value() Date { return Date(d) }
+func (d DateLiteral) Any() any { return d.Value() }
func (d DateLiteral) String() string {
t := Date(d).ToTime()
@@ -723,6 +730,7 @@ type TimeLiteral Time
func (TimeLiteral) Comparator() Comparator[Time] { return cmp.Compare[Time] }
func (t TimeLiteral) Type() Type { return PrimitiveTypes.Time }
func (t TimeLiteral) Value() Time { return Time(t) }
+func (t TimeLiteral) Any() any { return t.Value() }
func (t TimeLiteral) String() string {
tm := time.UnixMicro(int64(t)).UTC()
@@ -766,6 +774,7 @@ type TimestampLiteral Timestamp
func (TimestampLiteral) Comparator() Comparator[Timestamp] { return
cmp.Compare[Timestamp] }
func (t TimestampLiteral) Type() Type { return
PrimitiveTypes.Timestamp }
func (t TimestampLiteral) Value() Timestamp { return
Timestamp(t) }
+func (t TimestampLiteral) Any() any { return t.Value() }
func (t TimestampLiteral) String() string {
tm := Timestamp(t).ToTime()
@@ -816,6 +825,7 @@ type StringLiteral string
func (StringLiteral) Comparator() Comparator[string] { return
cmp.Compare[string] }
func (s StringLiteral) Type() Type { return
PrimitiveTypes.String }
func (s StringLiteral) Value() string { return string(s) }
+func (s StringLiteral) Any() any { return s.Value() }
func (s StringLiteral) String() string { return string(s) }
func (s StringLiteral) To(typ Type) (Literal, error) {
switch t := typ.(type) {
@@ -959,6 +969,7 @@ func (BinaryLiteral) Comparator() Comparator[[]byte] {
}
func (b BinaryLiteral) Type() Type { return PrimitiveTypes.Binary }
func (b BinaryLiteral) Value() []byte { return []byte(b) }
+func (b BinaryLiteral) Any() any { return b.Value() }
func (b BinaryLiteral) String() string { return string(b) }
func (b BinaryLiteral) To(typ Type) (Literal, error) {
switch t := typ.(type) {
@@ -1012,6 +1023,7 @@ type FixedLiteral []byte
func (FixedLiteral) Comparator() Comparator[[]byte] { return bytes.Compare }
func (f FixedLiteral) Type() Type { return
FixedTypeOf(len(f)) }
func (f FixedLiteral) Value() []byte { return []byte(f) }
+func (f FixedLiteral) Any() any { return f.Value() }
func (f FixedLiteral) String() string { return string(f) }
func (f FixedLiteral) To(typ Type) (Literal, error) {
switch t := typ.(type) {
@@ -1071,6 +1083,7 @@ func (UUIDLiteral) Comparator() Comparator[uuid.UUID] {
func (UUIDLiteral) Type() Type { return PrimitiveTypes.UUID }
func (u UUIDLiteral) Value() uuid.UUID { return uuid.UUID(u) }
+func (u UUIDLiteral) Any() any { return u.Value() }
func (u UUIDLiteral) String() string { return uuid.UUID(u).String() }
func (u UUIDLiteral) To(typ Type) (Literal, error) {
switch t := typ.(type) {
@@ -1136,6 +1149,7 @@ func (DecimalLiteral) Comparator() Comparator[Decimal] {
}
func (d DecimalLiteral) Type() Type { return DecimalTypeOf(9, d.Scale) }
func (d DecimalLiteral) Value() Decimal { return Decimal(d) }
+func (d DecimalLiteral) Any() any { return d.Value() }
func (d DecimalLiteral) String() string {
return d.Val.ToString(int32(d.Scale))
}
diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index f987bb4..a7cdd87 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -759,6 +759,9 @@ func (a *arrowProjectionVisitor) castIfNeeded(field
iceberg.NestedField, vals ar
panic(fmt.Errorf("unsupported schema projection from %s
to %s",
vals.DataType(), targetType))
+ default:
+ return retOrPanic(compute.CastArray(a.ctx, vals,
+ compute.SafeCastOptions(targetType)))
}
}
vals.Retain()
@@ -1184,6 +1187,8 @@ func (w wrappedDecStats) Max() iceberg.Decimal {
}
type statsAgg interface {
+ min() iceberg.Literal
+ max() iceberg.Literal
update(stats metadata.TypedStatistics)
minAsBytes() ([]byte, error)
maxAsBytes() ([]byte, error)
@@ -1265,6 +1270,9 @@ func createStatsAgg(typ iceberg.PrimitiveType,
physicalTypeStr string, truncLen
}
}
+func (s *statsAggregator[T]) min() iceberg.Literal { return s.curMin }
+func (s *statsAggregator[T]) max() iceberg.Literal { return s.curMax }
+
func (s *statsAggregator[T]) update(stats metadata.TypedStatistics) {
st := stats.(typedStat[T])
s.updateMin(st.Min())
@@ -1631,27 +1639,55 @@ type dataFileStatistics struct {
splitOffsets []int64
}
-func (d *dataFileStatistics) toDataFile(spec iceberg.PartitionSpec, path
string, format iceberg.FileFormat, filesize int64) (iceberg.DataFile, error) {
+func (d *dataFileStatistics) partitionValue(field iceberg.PartitionField, sc
*iceberg.Schema) any {
+ agg, ok := d.colAggs[field.SourceID]
+ if !ok {
+ return nil
+ }
+
+ if !field.Transform.PreservesOrder() {
+ panic(fmt.Errorf("cannot infer partition value from parquet
metadata for a non-linear partition field: %s with transform %s",
+ field.Name, field.Transform))
+ }
+
+ lowerVal, upperVal := agg.min(), agg.max()
+ if !lowerVal.Equals(upperVal) {
+ panic(fmt.Errorf("cannot infer partition value from parquet
metadata as there is more than one value for partition field: %s. (low: %s,
high: %s)",
+ field.Name, lowerVal, upperVal))
+ }
+
+ val := field.Transform.Apply(must(partitionRecordValue(field, lowerVal,
sc)))
+ if !val.Valid {
+ return nil
+ }
+
+ return val.Val.Any()
+}
+
+func (d *dataFileStatistics) toDataFile(schema *iceberg.Schema, spec
iceberg.PartitionSpec, path string, format iceberg.FileFormat, filesize int64)
iceberg.DataFile {
+ var partitionData map[string]any
+ if !spec.Equals(*iceberg.UnpartitionedSpec) {
+ partitionData = make(map[string]any)
+ for field := range spec.Fields() {
+ val := d.partitionValue(field, schema)
+ if val != nil {
+ partitionData[field.Name] = val
+ }
+ }
+ }
+
bldr, err := iceberg.NewDataFileBuilder(spec, iceberg.EntryContentData,
- path, format, nil, d.recordCount, filesize)
+ path, format, partitionData, d.recordCount, filesize)
if err != nil {
- return nil, err
+ panic(err)
}
lowerBounds := make(map[int][]byte)
upperBounds := make(map[int][]byte)
for fieldID, agg := range d.colAggs {
- min, err := agg.minAsBytes()
- if err != nil {
- return nil, err
- }
-
- max, err := agg.maxAsBytes()
- if err != nil {
- return nil, err
- }
-
+ min := must(agg.minAsBytes())
+ max := must(agg.maxAsBytes())
if len(min) > 0 {
lowerBounds[fieldID] = min
}
@@ -1666,13 +1702,14 @@ func (d *dataFileStatistics) toDataFile(spec
iceberg.PartitionSpec, path string,
if len(upperBounds) > 0 {
bldr.UpperBoundValues(upperBounds)
}
+
bldr.ColumnSizes(d.colSizes)
bldr.ValueCounts(d.valueCounts)
bldr.NullValueCounts(d.nullValueCounts)
bldr.NaNValueCounts(d.nanValueCounts)
bldr.SplitOffsets(d.splitOffsets)
- return bldr.Build(), nil
+ return bldr.Build()
}
func dataFileStatsFromParquetMetadata(pqmeta *metadata.FileMetaData, statsCols
map[int]statisticsCollector, colMapping map[string]int) *dataFileStatistics {
@@ -1807,6 +1844,8 @@ func parquetFilesToDataFiles(fileIO iceio.IO, meta
*MetadataBuilder, paths iter.
}
}()
+ currentSchema, currentSpec := meta.CurrentSchema(),
meta.CurrentSpec()
+
for filePath := range paths {
inputFile := must(fileIO.Open(filePath))
defer inputFile.Close()
@@ -1824,19 +1863,15 @@ func parquetFilesToDataFiles(fileIO iceio.IO, meta
*MetadataBuilder, paths iter.
return
}
- if err := checkArrowSchemaCompat(meta.CurrentSchema(),
arrSchema, false); err != nil {
+ if err := checkArrowSchemaCompat(currentSchema,
arrSchema, false); err != nil {
panic(err)
}
statistics :=
dataFileStatsFromParquetMetadata(rdr.MetaData(),
- must(computeStatsPlan(meta.CurrentSchema(),
meta.props)),
-
must(parquetPathToIDMapping(meta.CurrentSchema())))
-
- df, err := statistics.toDataFile(meta.CurrentSpec(),
filePath, iceberg.ParquetFile, rdr.MetaData().GetSourceFileSize())
- if err != nil {
- panic(err)
- }
+ must(computeStatsPlan(currentSchema,
meta.props)),
+ must(parquetPathToIDMapping(currentSchema)))
+ df := statistics.toDataFile(currentSchema, currentSpec,
filePath, iceberg.ParquetFile, rdr.MetaData().GetSourceFileSize())
if !yield(df, nil) {
return
}
diff --git a/table/arrow_utils_internal_test.go
b/table/arrow_utils_internal_test.go
index 5584d50..3b478e9 100644
--- a/table/arrow_utils_internal_test.go
+++ b/table/arrow_utils_internal_test.go
@@ -197,11 +197,9 @@ func (suite *FileStatsMetricsSuite) getDataFile(meta
iceberg.Properties, writeSt
suite.Require().NoError(err)
stats := dataFileStatsFromParquetMetadata(fileMeta, collector, mapping)
- df, err := stats.toDataFile(tableMeta.PartitionSpec(),
"fake-path.parquet",
- iceberg.ParquetFile, fileMeta.GetSourceFileSize())
- suite.Require().NoError(err)
- return df
+ return stats.toDataFile(tableMeta.CurrentSchema(),
tableMeta.PartitionSpec(), "fake-path.parquet",
+ iceberg.ParquetFile, fileMeta.GetSourceFileSize())
}
func (suite *FileStatsMetricsSuite) TestRecordCount() {
@@ -345,14 +343,14 @@ func (suite *FileStatsMetricsSuite)
TestColumnMetricsMode() {
func (suite *FileStatsMetricsSuite) TestReadMissingStats() {
df := suite.getDataFile(nil, []string{"strings"})
- stringColIdx := 1
- suite.Len(df.LowerBoundValues(), 1)
- suite.Equal([]byte("aaaaaaaaaaaaaaaa"),
df.LowerBoundValues()[stringColIdx])
+ suite.Len(df.NullValueCounts(), 1)
suite.Len(df.UpperBoundValues(), 1)
- suite.Equal([]byte("zzzzzzzzzzzzzzz{"),
df.UpperBoundValues()[stringColIdx])
+ suite.Len(df.LowerBoundValues(), 1)
- suite.Len(df.NullValueCounts(), 1)
- suite.EqualValues(1, df.NullValueCounts()[stringColIdx])
+ stringsColIdx := 1
+ suite.Equal("aaaaaaaaaaaaaaaa",
string(df.LowerBoundValues()[stringsColIdx]))
+ suite.Equal("zzzzzzzzzzzzzzz{",
string(df.UpperBoundValues()[stringsColIdx]))
+ suite.EqualValues(1, df.NullValueCounts()[stringsColIdx])
}
func TestFileMetrics(t *testing.T) {
@@ -599,9 +597,8 @@ func TestMetricsPrimitiveTypes(t *testing.T) {
require.NoError(t, err)
stats := dataFileStatsFromParquetMetadata(meta, collector, mapping)
- df, err := stats.toDataFile(tblMeta.PartitionSpec(),
"fake-path.parquet",
+ df := stats.toDataFile(tblMeta.CurrentSchema(),
tblMeta.PartitionSpec(), "fake-path.parquet",
iceberg.ParquetFile, meta.GetSourceFileSize())
- require.NoError(t, err)
assert.Len(t, df.ValueCounts(), 15)
assert.Len(t, df.NullValueCounts(), 15)
diff --git a/table/internal/parquet_files.go b/table/internal/parquet_files.go
index d1b5059..770a9be 100644
--- a/table/internal/parquet_files.go
+++ b/table/internal/parquet_files.go
@@ -339,7 +339,12 @@ func (p *pruneParquetSchema) Field(field
pqarrow.SchemaField, result arrow.Field
}
func (p *pruneParquetSchema) List(field pqarrow.SchemaField, elemResult
arrow.Field, mapping *iceberg.MappedField) arrow.Field {
- _, ok := p.selected[p.fieldID(*field.Children[0].Field, mapping)]
+ var elemMapping *iceberg.MappedField
+ if mapping != nil {
+ elemMapping = mapping.GetField("element")
+ }
+
+ _, ok := p.selected[p.fieldID(*field.Children[0].Field, elemMapping)]
if !ok {
if elemResult.Type != nil {
result := *field.Field
diff --git a/table/scanner.go b/table/scanner.go
index c0eaf93..a058d90 100644
--- a/table/scanner.go
+++ b/table/scanner.go
@@ -488,3 +488,26 @@ func (scan *Scan) ToArrowTable(ctx context.Context)
(arrow.Table, error) {
return array.NewTableFromRecords(schema, records), nil
}
+
+func partitionRecordValue(field iceberg.PartitionField, val iceberg.Literal,
schema *iceberg.Schema) (iceberg.Optional[iceberg.Literal], error) {
+ var ret iceberg.Optional[iceberg.Literal]
+ if val == nil {
+ return ret, nil
+ }
+
+ f, ok := schema.FindFieldByID(field.SourceID)
+ if !ok {
+ return ret, fmt.Errorf("%w: could not find source field in
schema for %s",
+ iceberg.ErrInvalidSchema, field.String())
+ }
+
+ out, err := val.To(f.Type)
+ if err != nil {
+ return ret, err
+ }
+
+ ret.Val = out
+ ret.Valid = true
+
+ return ret, nil
+}
diff --git a/table/snapshots.go b/table/snapshots.go
index b12c74a..45b21ad 100644
--- a/table/snapshots.go
+++ b/table/snapshots.go
@@ -21,6 +21,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "iter"
"maps"
"slices"
"strconv"
@@ -306,6 +307,37 @@ func (s Snapshot) Manifests(fio iceio.IO)
([]iceberg.ManifestFile, error) {
return nil, nil
}
+func (s Snapshot) dataFiles(fio iceio.IO, fileFilter
set[iceberg.ManifestEntryContent]) iter.Seq2[iceberg.DataFile, error] {
+ return func(yield func(iceberg.DataFile, error) bool) {
+ manifests, err := s.Manifests(fio)
+ if err != nil {
+ yield(nil, err)
+
+ return
+ }
+
+ for _, m := range manifests {
+ dataFiles, err := m.FetchEntries(fio, false)
+ if err != nil {
+ yield(nil, err)
+
+ return
+ }
+
+ for _, f := range dataFiles {
+ if fileFilter != nil {
+ if _, ok :=
fileFilter[f.DataFile().ContentType()]; !ok {
+ continue
+ }
+ }
+ if !yield(f.DataFile(), nil) {
+ return
+ }
+ }
+ }
+ }
+}
+
type MetadataLogEntry struct {
MetadataFile string `json:"metadata-file"`
TimestampMs int64 `json:"timestamp-ms"`
diff --git a/table/table.go b/table/table.go
index 2584b72..db679e6 100644
--- a/table/table.go
+++ b/table/table.go
@@ -27,6 +27,8 @@ import (
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/internal"
"github.com/apache/iceberg-go/io"
+ tblutils "github.com/apache/iceberg-go/table/internal"
+ "golang.org/x/sync/errgroup"
)
type Identifier = []string
@@ -86,6 +88,86 @@ func (t Table) NewTransaction() *Transaction {
}
}
+func (t Table) AllManifests() iter.Seq2[iceberg.ManifestFile, error] {
+ type list = tblutils.Enumerated[[]iceberg.ManifestFile]
+ g := errgroup.Group{}
+
+ n := len(t.metadata.Snapshots())
+ ch := make(chan list, n)
+ for i, sn := range t.metadata.Snapshots() {
+ g.Go(func() error {
+ manifests, err := sn.Manifests(t.fs)
+ if err != nil {
+ return err
+ }
+
+ ch <- list{Index: i, Value: manifests, Last: i == n-1}
+
+ return nil
+ })
+ }
+
+ errch := make(chan error, 1)
+ go func() {
+ defer close(errch)
+ defer close(ch)
+ if err := g.Wait(); err != nil {
+ errch <- err
+ }
+ }()
+
+ results := tblutils.MakeSequencedChan(uint(n), ch,
+ func(left, right *list) bool {
+ switch {
+ case left.Index < 0:
+ return true
+ case right.Index < 0:
+ return false
+ default:
+ return left.Index < right.Index
+ }
+ }, func(prev, next *list) bool {
+ if prev.Index < 0 {
+ return next.Index == 0
+ }
+
+ return next.Index == prev.Index+1
+ }, list{Index: -1})
+
+ return func(yield func(iceberg.ManifestFile, error) bool) {
+ defer func() {
+ // drain channels if we exited early
+ go func() {
+ for range results {
+ }
+ for range errch {
+ }
+ }()
+ }()
+
+ for {
+ select {
+ case err := <-errch:
+ if err != nil {
+ yield(nil, err)
+
+ return
+ }
+ case next, ok := <-results:
+ for _, mf := range next.Value {
+ if !yield(mf, nil) {
+ return
+ }
+ }
+
+ if next.Last || !ok {
+ return
+ }
+ }
+ }
+ }
+}
+
func (t Table) doCommit(ctx context.Context, updates []Update, reqs
[]Requirement) (*Table, error) {
newMeta, newLoc, err := t.cat.CommitTable(ctx, &t, reqs, updates)
if err != nil {
diff --git a/table/table_test.go b/table/table_test.go
index 8ae66d7..044fa01 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -150,6 +150,10 @@ type TableWritingTestSuite struct {
arrSchemaUpdated *arrow.Schema
arrTblUpdated arrow.Table
+ tableSchemaPromotedTypes *iceberg.Schema
+ arrSchemaPromotedTypes *arrow.Schema
+ arrTablePromotedTypes arrow.Table
+
location string
formatVersion int
}
@@ -211,6 +215,44 @@ func (t *TableWritingTestSuite) SetupSuite() {
`[{"foo": true, "baz": 123, "qux": "2024-03-07", "quux": 234}]`,
})
t.Require().NoError(err)
+
+ t.tableSchemaPromotedTypes = iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "long", Type:
iceberg.PrimitiveTypes.Int64},
+ iceberg.NestedField{
+ ID: 2, Name: "list",
+ Type: &iceberg.ListType{ElementID: 4, Element:
iceberg.PrimitiveTypes.Int64},
+ Required: true,
+ },
+ iceberg.NestedField{
+ ID: 3, Name: "map",
+ Type: &iceberg.MapType{
+ KeyID: 5,
+ KeyType: iceberg.PrimitiveTypes.String,
+ ValueID: 6,
+ ValueType: iceberg.PrimitiveTypes.Int64,
+ },
+ Required: true,
+ },
+ iceberg.NestedField{ID: 7, Name: "double", Type:
iceberg.PrimitiveTypes.Float64})
+ // arrow-go needs to implement cast_extension for [16]byte -> uuid
+ // iceberg.NestedField{ID: 8, Name: "uuid", Type:
iceberg.PrimitiveTypes.UUID})
+
+ t.arrSchemaPromotedTypes = arrow.NewSchema([]arrow.Field{
+ {Name: "long", Type: arrow.PrimitiveTypes.Int32, Nullable:
true},
+ {Name: "list", Type: arrow.ListOf(arrow.PrimitiveTypes.Int32),
Nullable: false},
+ {Name: "map", Type: arrow.MapOf(arrow.BinaryTypes.String,
arrow.PrimitiveTypes.Int32), Nullable: false},
+ {Name: "double", Type: arrow.PrimitiveTypes.Float32, Nullable:
true},
+ }, nil)
+ // arrow-go needs to implement cast_extension for [16]byte -> uuid
+ // {Name: "uuid", Type: &arrow.FixedSizeBinaryType{ByteWidth: 16},
Nullable: true}}, nil)
+
+ t.arrTablePromotedTypes, err = array.TableFromJSON(mem,
t.arrSchemaPromotedTypes, []string{
+ `[
+ {"long": 1, "list": [1, 1], "map": [{"key": "a",
"value": 1}], "double": 1.1, "uuid": "cVp4705TQImb+TrQ7pv1RQ=="},
+ {"long": 9, "list": [2, 2], "map": [{"key": "b",
"value": 2}], "double": 9.2, "uuid": "l12HVF5KREqWl/R25AMM3g=="}
+ ]`,
+ })
+ t.Require().NoError(err)
}
func (t *TableWritingTestSuite) SetupTest() {
@@ -367,6 +409,326 @@ func (t *TableWritingTestSuite)
TestAddFilesFailsSchemaMismatch() {
`)
}
+func (t *TableWritingTestSuite) TestAddFilesPartitionedTable() {
+ ident := table.Identifier{"default", "partitioned_table_v" +
strconv.Itoa(t.formatVersion)}
+ spec := iceberg.NewPartitionSpec(
+ iceberg.PartitionField{SourceID: 4, FieldID: 1000, Transform:
iceberg.IdentityTransform{}, Name: "baz"},
+ iceberg.PartitionField{SourceID: 10, FieldID: 1001, Transform:
iceberg.MonthTransform{}, Name: "qux_month"})
+
+ tbl := t.createTable(ident, t.formatVersion,
+ spec, t.tableSchema)
+
+ t.NotNil(tbl)
+
+ dates := []string{
+ "2024-03-07", "2024-03-08", "2024-03-16", "2024-03-18",
"2024-03-19",
+ }
+
+ files := make([]string, 0)
+ for i := range 5 {
+ filePath := fmt.Sprintf("%s/partitioned_table/test-%d.parquet",
t.location, i)
+ table, err := array.TableFromJSON(memory.DefaultAllocator,
t.arrSchema, []string{
+ `[{"foo": true, "bar": "bar_string", "baz": 123, "qux":
"` + dates[i] + `"}]`,
+ })
+ t.Require().NoError(err)
+ defer table.Release()
+
+ t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath, table)
+ files = append(files, filePath)
+ }
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles(files, nil, false))
+
+ stagedTbl, err := tx.StagedTable()
+ t.Require().NoError(err)
+ t.NotNil(stagedTbl.NameMapping())
+
+ t.Equal(stagedTbl.CurrentSnapshot().Summary,
+ &table.Summary{
+ Operation: table.OpAppend,
+ Properties: iceberg.Properties{
+ "added-data-files": "5",
+ "added-files-size": "3660",
+ "added-records": "5",
+ "changed-partition-count": "1",
+ "total-data-files": "5",
+ "total-delete-files": "0",
+ "total-equality-deletes": "0",
+ "total-files-size": "3660",
+ "total-position-deletes": "0",
+ "total-records": "5",
+ },
+ })
+
+ m, err := stagedTbl.CurrentSnapshot().Manifests(tbl.FS())
+ t.Require().NoError(err)
+
+ for _, manifest := range m {
+ entries, err := manifest.FetchEntries(tbl.FS(), false)
+ t.Require().NoError(err)
+
+ for _, e := range entries {
+ t.Equal(map[string]any{
+ "baz": 123, "qux_month": 650,
+ }, e.DataFile().Partition())
+ }
+ }
+}
+
+func (t *TableWritingTestSuite) TestAddFilesToBucketPartitionedTableFails() {
+ ident := table.Identifier{"default", "partitioned_table_bucket_fails_v"
+ strconv.Itoa(t.formatVersion)}
+ spec := iceberg.NewPartitionSpec(
+ iceberg.PartitionField{SourceID: 4, FieldID: 1000, Transform:
iceberg.BucketTransform{NumBuckets: 3}, Name: "baz_bucket_3"})
+
+ tbl := t.createTable(ident, t.formatVersion, spec, t.tableSchema)
+ files := make([]string, 0)
+ for i := range 5 {
+ filePath := fmt.Sprintf("%s/partitioned_table/test-%d.parquet",
t.location, i)
+ table, err := array.TableFromJSON(memory.DefaultAllocator,
t.arrSchema, []string{
+ `[{"foo": true, "bar": "bar_string", "baz": ` +
strconv.Itoa(i) + `, "qux": "2024-03-07"}]`,
+ })
+ t.Require().NoError(err)
+ defer table.Release()
+
+ t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath, table)
+ files = append(files, filePath)
+ }
+
+ tx := tbl.NewTransaction()
+ err := tx.AddFiles(files, nil, false)
+ t.Error(err)
+ t.ErrorContains(err, "cannot infer partition value from parquet
metadata for a non-linear partition field: baz_bucket_3 with transform
bucket[3]")
+}
+
+func (t *TableWritingTestSuite)
TestAddFilesToPartitionedTableFailsLowerAndUpperMismatch() {
+ ident := table.Identifier{"default", "partitioned_table_bucket_fails_v"
+ strconv.Itoa(t.formatVersion)}
+ spec := iceberg.NewPartitionSpec(
+ iceberg.PartitionField{SourceID: 4, FieldID: 1000, Transform:
iceberg.IdentityTransform{}, Name: "baz"})
+
+ tbl := t.createTable(ident, t.formatVersion, spec, t.tableSchema)
+ files := make([]string, 0)
+ for i := range 5 {
+ filePath := fmt.Sprintf("%s/partitioned_table/test-%d.parquet",
t.location, i)
+ table, err := array.TableFromJSON(memory.DefaultAllocator,
t.arrSchema, []string{
+ `[
+ {"foo": true, "bar": "bar_string", "baz": 123,
"qux": "2024-03-07"},
+ {"foo": true, "bar": "bar_string", "baz": 124,
"qux": "2024-03-07"}
+ ]`,
+ })
+ t.Require().NoError(err)
+ defer table.Release()
+
+ t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath, table)
+ files = append(files, filePath)
+ }
+
+ tx := tbl.NewTransaction()
+ err := tx.AddFiles(files, nil, false)
+ t.Error(err)
+ t.ErrorContains(err, "cannot infer partition value from parquet
metadata as there is more than one value for partition field: baz. (low: 123,
high: 124)")
+}
+
+func (t *TableWritingTestSuite) TestAddFilesWithLargeAndRegular() {
+ ident := table.Identifier{"default", "unpartitioned_with_large_types_v"
+ strconv.Itoa(t.formatVersion)}
+ ice := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "foo", Type:
iceberg.PrimitiveTypes.String, Required: true})
+
+ arrowSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "foo", Type: arrow.BinaryTypes.String},
+ }, nil)
+ arrowSchemaLarge := arrow.NewSchema([]arrow.Field{
+ {Name: "foo", Type: arrow.BinaryTypes.LargeString},
+ }, nil)
+
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, ice)
+ t.Require().NotNil(tbl)
+
+ filePath :=
fmt.Sprintf("%s/unpartitioned_with_large_types/v%d/test-0.parquet", t.location,
t.formatVersion)
+ filePathLarge :=
fmt.Sprintf("%s/unpartitioned_with_large_types/v%d/test-1.parquet", t.location,
t.formatVersion)
+
+ arrTable, err := array.TableFromJSON(memory.DefaultAllocator,
arrowSchema, []string{
+ `[{"foo": "bar"}]`,
+ })
+ t.Require().NoError(err)
+ defer arrTable.Release()
+
+ arrTableLarge, err := array.TableFromJSON(memory.DefaultAllocator,
arrowSchemaLarge,
+ []string{`[{"foo": "bar"}]`})
+ t.Require().NoError(err)
+ defer arrTableLarge.Release()
+
+ t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath, arrTable)
+ t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePathLarge,
arrTableLarge)
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles([]string{filePath, filePathLarge}, nil,
false))
+
+ stagedTbl, err := tx.StagedTable()
+ t.Require().NoError(err)
+
+ result, err := stagedTbl.Scan(table.WithOptions(iceberg.Properties{
+ table.ScanOptionArrowUseLargeTypes: "true",
+ })).ToArrowTable(context.Background())
+ t.Require().NoError(err)
+ defer result.Release()
+
+ t.EqualValues(2, result.NumRows())
+ t.Truef(arrowSchemaLarge.Equal(result.Schema()), "expected schema: %s,
got: %s", arrowSchemaLarge, result.Schema())
+}
+
+func (t *TableWritingTestSuite) TestAddFilesValidUpcast() {
+ ident := table.Identifier{"default", "test_table_with_valid_upcast_v" +
strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchemaPromotedTypes)
+
+ filePath :=
fmt.Sprintf("%s/test_table_with_valid_upcast_v%d/test.parquet", t.location,
t.formatVersion)
+ t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath,
t.arrTablePromotedTypes)
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles([]string{filePath}, nil, false))
+
+ staged, err := tx.StagedTable()
+ t.Require().NoError(err)
+
+ written, err := staged.Scan().ToArrowTable(context.Background())
+ t.Require().NoError(err)
+ defer written.Release()
+
+ t.EqualValues(2, written.NumRows())
+ expectedSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "long", Type: arrow.PrimitiveTypes.Int64, Nullable:
true},
+ {Name: "list", Type: arrow.ListOf(arrow.PrimitiveTypes.Int64),
Nullable: false},
+ {Name: "map", Type: arrow.MapOf(arrow.BinaryTypes.String,
arrow.PrimitiveTypes.Int64), Nullable: false},
+ {Name: "double", Type: arrow.PrimitiveTypes.Float64, Nullable:
true},
+ }, nil)
+ t.True(expectedSchema.Equal(written.Schema()))
+}
+
+func dropColFromTable(idx int, tbl arrow.Table) arrow.Table {
+ if idx < 0 || idx >= int(tbl.NumCols()) {
+ panic("invalid column to drop")
+ }
+
+ fields := tbl.Schema().Fields()
+ fields = append(fields[:idx], fields[idx+1:]...)
+
+ cols := make([]arrow.Column, 0, tbl.NumCols()-1)
+ for i := 0; i < int(tbl.NumCols()); i++ {
+ if i == idx {
+ continue
+ }
+ cols = append(cols, *tbl.Column(i))
+ }
+
+ return array.NewTable(arrow.NewSchema(fields, nil), cols, tbl.NumRows())
+}
+
+func (t *TableWritingTestSuite) TestAddFilesSubsetOfSchema() {
+ ident := table.Identifier{"default",
"test_table_with_subset_of_schema_v" + strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ filePath :=
fmt.Sprintf("%s/test_table_with_subset_of_schema_v%d/test.parquet", t.location,
t.formatVersion)
+ withoutCol := dropColFromTable(0, t.arrTbl)
+ defer withoutCol.Release()
+ t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath, withoutCol)
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles([]string{filePath}, nil, false))
+
+ staged, err := tx.StagedTable()
+ t.Require().NoError(err)
+
+ written, err := staged.Scan().ToArrowTable(context.Background())
+ t.Require().NoError(err)
+ defer written.Release()
+
+ t.EqualValues(1, written.NumRows())
+ expectedSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "foo", Type: arrow.FixedWidthTypes.Boolean, Nullable:
true},
+ {Name: "bar", Type: arrow.BinaryTypes.String, Nullable: true},
+ {Name: "baz", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
+ {Name: "qux", Type: arrow.PrimitiveTypes.Date32, Nullable:
true},
+ }, nil)
+ t.True(expectedSchema.Equal(written.Schema()), expectedSchema.String(),
written.Schema().String())
+
+ result, err := array.TableFromJSON(memory.DefaultAllocator,
t.arrSchema, []string{
+ `[{"foo": null, "bar": "bar_string", "baz": 123, "qux":
"2024-03-07"}]`,
+ })
+ t.Require().NoError(err)
+ defer result.Release()
+
+ t.True(array.TableEqual(result, written))
+}
+
+func (t *TableWritingTestSuite) TestAddFilesDuplicateFilesInFilePaths() {
+ ident := table.Identifier{"default",
"test_table_with_duplicate_files_v" + strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
*iceberg.UnpartitionedSpec, t.tableSchema)
+
+ filePath :=
fmt.Sprintf("%s/test_table_with_duplicate_files_v%d/test.parquet", t.location,
t.formatVersion)
+ t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath, t.arrTbl)
+
+ tx := tbl.NewTransaction()
+ err := tx.AddFiles([]string{filePath, filePath}, nil, false)
+ t.Error(err)
+ t.ErrorContains(err, "file paths must be unique for AddFiles")
+}
+
+func (t *TableWritingTestSuite) TestAddFilesReferencedByCurrentSnapshot() {
+ ident := table.Identifier{"default", "add_files_referenced_v" +
strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
+ *iceberg.UnpartitionedSpec, t.tableSchema)
+
+ t.NotNil(tbl)
+
+ files := make([]string, 0)
+ for i := range 5 {
+ filePath :=
fmt.Sprintf("%s/add_files_referenced/test-%d.parquet", t.location, i)
+ t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath, t.arrTbl)
+ files = append(files, filePath)
+ }
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles(files, nil, false))
+
+ err := tx.AddFiles(files[len(files)-1:], nil, false)
+ t.Error(err)
+ t.ErrorContains(err, "cannot add files that are already referenced by
table, files:")
+}
+
+func (t *TableWritingTestSuite)
TestAddFilesReferencedCurrentSnapshotIgnoreDuplicates() {
+ ident := table.Identifier{"default", "add_files_referenced_v" +
strconv.Itoa(t.formatVersion)}
+ tbl := t.createTable(ident, t.formatVersion,
+ *iceberg.UnpartitionedSpec, t.tableSchema)
+
+ t.NotNil(tbl)
+
+ files := make([]string, 0)
+ for i := range 5 {
+ filePath :=
fmt.Sprintf("%s/add_files_referenced/test-%d.parquet", t.location, i)
+ t.writeParquet(tbl.FS().(iceio.WriteFileIO), filePath, t.arrTbl)
+ files = append(files, filePath)
+ }
+
+ tx := tbl.NewTransaction()
+ t.Require().NoError(tx.AddFiles(files, nil, false))
+
+ t.Require().NoError(tx.AddFiles(files[len(files)-1:], nil, true))
+ staged, err := tx.StagedTable()
+ t.Require().NoError(err)
+
+ added, existing, deleted := []int32{}, []int32{}, []int32{}
+ for m, err := range staged.AllManifests() {
+ t.Require().NoError(err)
+ added = append(added, m.AddedDataFiles())
+ existing = append(existing, m.ExistingDataFiles())
+ deleted = append(deleted, m.DeletedDataFiles())
+ }
+
+ t.Equal([]int32{5, 1, 5}, added)
+ t.Equal([]int32{0, 0, 0}, existing)
+ t.Equal([]int32{0, 0, 0}, deleted)
+}
+
func TestTableWriting(t *testing.T) {
suite.Run(t, &TableWritingTestSuite{formatVersion: 1})
suite.Run(t, &TableWritingTestSuite{formatVersion: 2})
diff --git a/table/transaction.go b/table/transaction.go
index 15785b5..bdbc62b 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"errors"
+ "fmt"
"slices"
"sync"
"time"
@@ -119,18 +120,32 @@ func (t *Transaction) Append(rdr array.RecordReader,
snapshotProps iceberg.Prope
return iceberg.ErrNotImplemented
}
-func (t *Transaction) AddFiles(files []string, snapshotProps
iceberg.Properties, checkDuplicates bool) error {
+func (t *Transaction) AddFiles(files []string, snapshotProps
iceberg.Properties, ignoreDuplicates bool) error {
set := make(map[string]string)
for _, f := range files {
set[f] = f
}
if len(set) != len(files) {
- return errors.New("file paths must be unique for
AppendDataFiles")
- }
-
- if checkDuplicates {
- return iceberg.ErrNotImplemented
+ return errors.New("file paths must be unique for AddFiles")
+ }
+
+ if !ignoreDuplicates {
+ if s := t.meta.currentSnapshot(); s != nil {
+ referenced := make([]string, 0)
+ for df, err := range s.dataFiles(t.tbl.fs, nil) {
+ if err != nil {
+ return err
+ }
+
+ if _, ok := set[df.FilePath()]; ok {
+ referenced = append(referenced,
df.FilePath())
+ }
+ }
+ if len(referenced) > 0 {
+ return fmt.Errorf("cannot add files that are
already referenced by table, files: %s", referenced)
+ }
+ }
}
if t.meta.NameMapping() == nil {
diff --git a/transforms.go b/transforms.go
index 3066ea7..fb37e6c 100644
--- a/transforms.go
+++ b/transforms.go
@@ -86,6 +86,7 @@ type Transform interface {
fmt.Stringer
encoding.TextMarshaler
ResultType(t Type) Type
+ PreservesOrder() bool
Equals(Transform) bool
Apply(Optional[Literal]) Optional[Literal]
Project(name string, pred BoundPredicate) (UnboundPredicate, error)
@@ -104,6 +105,7 @@ func (t IdentityTransform) MarshalText() ([]byte, error) {
func (IdentityTransform) String() string { return "identity" }
func (IdentityTransform) ResultType(t Type) Type { return t }
+func (IdentityTransform) PreservesOrder() bool { return true }
func (IdentityTransform) Equals(other Transform) bool {
_, ok := other.(IdentityTransform)
@@ -161,6 +163,7 @@ func (t VoidTransform) MarshalText() ([]byte, error) {
func (VoidTransform) String() string { return "void" }
func (VoidTransform) ResultType(t Type) Type { return t }
+func (VoidTransform) PreservesOrder() bool { return false }
func (VoidTransform) Equals(other Transform) bool {
_, ok := other.(VoidTransform)
@@ -193,6 +196,7 @@ func (t BucketTransform) MarshalText() ([]byte, error) {
func (t BucketTransform) String() string { return fmt.Sprintf("bucket[%d]",
t.NumBuckets) }
func (BucketTransform) ResultType(Type) Type { return PrimitiveTypes.Int32 }
+func (BucketTransform) PreservesOrder() bool { return false }
func hashHelperInt[T ~int32 | ~int64](v any) uint32 {
var (
@@ -353,7 +357,7 @@ func (t TruncateTransform) MarshalText() ([]byte, error) {
func (t TruncateTransform) String() string { return
fmt.Sprintf("truncate[%d]", t.Width) }
func (TruncateTransform) ResultType(t Type) Type { return t }
-
+func (TruncateTransform) PreservesOrder() bool { return true }
func (t TruncateTransform) Equals(other Transform) bool {
rhs, ok := other.(TruncateTransform)
if !ok {
@@ -550,6 +554,7 @@ func (t YearTransform) MarshalText() ([]byte, error) {
func (YearTransform) String() string { return "year" }
func (YearTransform) ResultType(Type) Type { return PrimitiveTypes.Int32 }
+func (YearTransform) PreservesOrder() bool { return true }
func (YearTransform) Equals(other Transform) bool {
_, ok := other.(YearTransform)
@@ -627,6 +632,7 @@ func (t MonthTransform) MarshalText() ([]byte, error) {
func (MonthTransform) String() string { return "month" }
func (MonthTransform) ResultType(Type) Type { return PrimitiveTypes.Int32 }
+func (MonthTransform) PreservesOrder() bool { return true }
func (MonthTransform) Equals(other Transform) bool {
_, ok := other.(MonthTransform)
@@ -715,6 +721,7 @@ func (t DayTransform) MarshalText() ([]byte, error) {
func (DayTransform) String() string { return "day" }
func (DayTransform) ResultType(Type) Type { return PrimitiveTypes.Date }
+func (DayTransform) PreservesOrder() bool { return true }
func (DayTransform) Equals(other Transform) bool {
_, ok := other.(DayTransform)
@@ -792,6 +799,7 @@ func (t HourTransform) MarshalText() ([]byte, error) {
func (HourTransform) String() string { return "hour" }
func (HourTransform) ResultType(Type) Type { return PrimitiveTypes.Int32 }
+func (HourTransform) PreservesOrder() bool { return true }
func (HourTransform) Equals(other Transform) bool {
_, ok := other.(HourTransform)