This is an automated email from the ASF dual-hosted git repository.
laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new efe96b6f feat: add wire codec helpers for DataFile and FileScanTask
(#1075)
efe96b6f is described below
commit efe96b6f826d674c9edb0e09f6558dedc524e16b
Author: Tobias Pütz <[email protected]>
AuthorDate: Wed May 20 09:54:21 2026 +0200
feat: add wire codec helpers for DataFile and FileScanTask (#1075)
Adds on-the-wire encoding for DataFile and FileScanTask to enable
distributed compaction. With #1033, we decomposed compaction into a
coordinator and worker portion. This enables plumbing them together over
the wire. FileScanTask need to go coordinator -> worker (data file,
delete files, scan range + v3 row lineage) and DataFile from worker ->
coordinator collecting them for the commit.
iceberg.EncodeDataFile / DecodeDataFile reuse the manifest-entry Avro
encoding so the bytes on the wire are the same bytes a manifest carries.
The dataFile struct's avro tags remain the single source of truth.
Adding a field to DataFile extends what the helpers transport, with no
wire mirror to keep in sync. Encoding is non-mutating and thread-safe: a
fresh *dataFile is cloned via reflection over avro tags, so the source
is untouched.
table.EncodeFileScanTask / DecodeFileScanTask layer on top: each
embedded DataFile is iceberg-encoded, then wrapped alongside the scan
range and v3 row lineage in a small Avro envelope.
Design notes:
- The receiver supplies (spec, schema, version) out of band. Both sides
in the distributed-compaction design already hold table metadata, and
the per-(specID, version) avro schema is cached. Happy to switch to a
self-describing payload if preferred.
- distinct_counts round-trips on v1/v2 (thanks to #1044, which this PR
is rebased on); v3 omits it per spec (deprecated in v3,
apache/iceberg#12182). v3 callers that need it must transport it
separately.
- Reflection runs once per encode and is not in a hot path I can see.
Happy to precompute the avro field index at init if preferred.
- An anonymous `var _ = FileScanTask{...}` literal next to the codec is
a compile-time drift guard: adding/retyping/reordering a FileScanTask
field breaks the build, forcing a deliberate call on whether it must
cross the wire.
Format versions 1, 2, and 3 are supported. No change to existing
manifest read/write paths.
Tests: round-trip across v1/v2/v3 with fully populated DataFiles,
foreign-impl rejection, partition-data idempotence, and FileScanTask
shape including v3 row lineage.
cc @laskoviymishka, this continues building blocks for distributed
compaction
---
.github/workflows/go-ci.yml | 3 +
Makefile | 6 +-
codec/data_file.go | 111 ++++++++++++++
codec/data_file_test.go | 321 ++++++++++++++++++++++++++++++++++++++++
codec/file_scan_task.go | 208 ++++++++++++++++++++++++++
codec/file_scan_task_test.go | 198 +++++++++++++++++++++++++
data_file_codec.go | 298 +++++++++++++++++++++++++++++++++++++
data_file_codec_test.go | 234 +++++++++++++++++++++++++++++
go.mod | 1 +
internal/datafileavro/bridge.go | 37 +++++
10 files changed, 1416 insertions(+), 1 deletion(-)
diff --git a/.github/workflows/go-ci.yml b/.github/workflows/go-ci.yml
index 06872065..ad782580 100644
--- a/.github/workflows/go-ci.yml
+++ b/.github/workflows/go-ci.yml
@@ -57,3 +57,6 @@ jobs:
args: --timeout=10m
- name: Run tests
run: go test -v ./...
+ # Race detector is opt-in per package/test.
+ - name: Run race detector
+ run: go test -race -v ./codec/...
diff --git a/Makefile b/Makefile
index 355a4aed..9ada641c 100644
--- a/Makefile
+++ b/Makefile
@@ -17,11 +17,15 @@
# golangci-lint version (keep in sync with CI and README)
GOLANGCI_LINT_VERSION := v2.8.0
-.PHONY: test lint lint-install integration-setup integration-test
integration-scanner integration-io integration-rest integration-spark
integration-hadoop integration-down integration-logs docs-gen
+.PHONY: test test-race lint lint-install integration-setup integration-test
integration-scanner integration-io integration-rest integration-spark
integration-hadoop integration-down integration-logs docs-gen
test:
go test -v ./...
+# Race detector is opt-in per package/test.
+test-race:
+ go test -race -v ./codec/...
+
docs-gen:
go run ./website/gen
diff --git a/codec/data_file.go b/codec/data_file.go
new file mode 100644
index 00000000..b1f78fb1
--- /dev/null
+++ b/codec/data_file.go
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package codec encodes and decodes iceberg-go values for cross-process
+// transport. The bytes it produces are the same Avro bytes a manifest
+// carries for the corresponding value, so callers transport iceberg
+// internals without inventing a parallel wire schema.
+//
+// EncodeDataFile / DecodeDataFile move a single [iceberg.DataFile]
+// using the manifest-entry encoding for a given partition spec, table
+// schema, and format version.
+//
+// EncodeFileScanTask / DecodeFileScanTask layer on top: each embedded
+// DataFile is encoded with [EncodeDataFile], then wrapped alongside the
+// scan range and v3 row lineage in a small Avro envelope.
+//
+// The receiver supplies (spec, schema, version) out of band. Both sides
+// in a distributed-processing design already hold table metadata, and
+// the per-(partition-type, version) avro schema is cached.
+package codec
+
+import (
+ "errors"
+ "fmt"
+
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/internal/datafileavro"
+)
+
+// EncodeDataFile encodes a single DataFile for cross-process transport
+// using the manifest-entry Avro encoding for the given partition spec,
+// table schema and format version (1, 2, or 3). The wire format is the
+// same one a manifest carries for this data file. The receiver MUST
+// call [DecodeDataFile] with the matching (spec, schema, version)
+// triple.
+//
+// df must implement [iceberg.AvroEntryMarshaler]. The iceberg
+// package's built-in DataFile implementation satisfies it; external
+// implementations of [iceberg.DataFile] can opt in by implementing
+// the marshaler interface themselves.
+//
+// EncodeDataFile is non-mutating and safe to call concurrently with
+// any other reader or encoder of the same DataFile, provided the
+// underlying implementation honors that contract.
+//
+// v1 note: v1 manifest entries carry a non-nullable snapshot_id which
+// is written as 0 by the iceberg implementation. v1 bytes are not
+// usable as a standalone manifest entry — they only round-trip via
+// [DecodeDataFile].
+//
+// distinct_counts (field 111) is deprecated in the spec for all
+// versions. Already-set values round-trip on v1 and v2 as a
+// read-compatibility artifact; v3 omits the field entirely
+// (apache/iceberg#12182). New DataFiles should not set distinct
+// counts.
+func EncodeDataFile(df iceberg.DataFile, spec iceberg.PartitionSpec, schema
*iceberg.Schema, version int) ([]byte, error) {
+ if version < 1 || version > 3 {
+ return nil, fmt.Errorf("codec: EncodeDataFile: unsupported
format version %d", version)
+ }
+ m, ok := df.(iceberg.AvroEntryMarshaler)
+ if !ok {
+ return nil, fmt.Errorf("codec: EncodeDataFile: requires a
DataFile implementing iceberg.AvroEntryMarshaler, got %T", df)
+ }
+ b, err := m.MarshalAvroEntry(spec, schema, version)
+ if err != nil {
+ return nil, fmt.Errorf("codec: EncodeDataFile: %w", err)
+ }
+
+ return b, nil
+}
+
+// DecodeDataFile decodes bytes produced by [EncodeDataFile] back into a
+// DataFile. The (spec, schema, version) triple must match the encoder;
+// passing a different spec or version yields a decode error or silently
+// mis-typed partition values.
+//
+// The returned DataFile carries the partition spec id and the field-id
+// lookup tables, so Partition() and the stats accessors return id-keyed
+// maps as if the file had been read from a manifest.
+func DecodeDataFile(data []byte, spec iceberg.PartitionSpec, schema
*iceberg.Schema, version int) (iceberg.DataFile, error) {
+ if version < 1 || version > 3 {
+ return nil, fmt.Errorf("codec: DecodeDataFile: unsupported
format version %d", version)
+ }
+ if datafileavro.Unmarshal == nil {
+ return nil, errors.New("codec: DecodeDataFile: bridge not
initialized; the iceberg package must be imported to register the decoder")
+ }
+ res, err := datafileavro.Unmarshal(data, spec, schema, version)
+ if err != nil {
+ return nil, fmt.Errorf("codec: DecodeDataFile: %w", err)
+ }
+ df, ok := res.(iceberg.DataFile)
+ if !ok {
+ return nil, fmt.Errorf("codec: DecodeDataFile: bridge returned
unexpected type %T", res)
+ }
+
+ return df, nil
+}
diff --git a/codec/data_file_test.go b/codec/data_file_test.go
new file mode 100644
index 00000000..99796b4d
--- /dev/null
+++ b/codec/data_file_test.go
@@ -0,0 +1,321 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package codec_test
+
+import (
+ "reflect"
+ "sort"
+ "strconv"
+ "sync"
+ "testing"
+
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/codec"
+ "github.com/stretchr/testify/require"
+)
+
+func TestEncodeDecodeDataFileRoundTrip(t *testing.T) {
+ for _, version := range []int{1, 2, 3} {
+ t.Run("v"+strconv.Itoa(version), func(t *testing.T) {
+ spec, schema, original := fullyPopulatedDataFile(t,
version)
+
+ bytes, err := codec.EncodeDataFile(original, spec,
schema, version)
+ require.NoError(t, err)
+ require.NotEmpty(t, bytes)
+
+ decoded, err := codec.DecodeDataFile(bytes, spec,
schema, version)
+ require.NoError(t, err)
+ require.NotNil(t, decoded)
+
+ assertDataFileEqual(t, original, decoded, version)
+ })
+ }
+}
+
+func TestEncodeDataFileRejectsForeignImpl(t *testing.T) {
+ spec, schema, _ := fullyPopulatedDataFile(t, 2)
+ _, err := codec.EncodeDataFile(stubDataFile{}, spec, schema, 2)
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "iceberg.AvroEntryMarshaler",
+ "error should name the interface the caller needs to satisfy")
+ require.Contains(t, err.Error(), "codec: EncodeDataFile:",
+ "codec errors should be prefixed with the codec entry point
name")
+}
+
+func TestEncodeDataFileRejectsBadVersion(t *testing.T) {
+ spec, schema, df := fullyPopulatedDataFile(t, 2)
+ for _, v := range []int{0, 4, -1} {
+ _, err := codec.EncodeDataFile(df, spec, schema, v)
+ require.Error(t, err, "version %d must be rejected", v)
+ require.Contains(t, err.Error(), "codec: EncodeDataFile:")
+ require.Contains(t, err.Error(), "unsupported format version")
+ }
+}
+
+func TestDecodeDataFileRejectsBadVersion(t *testing.T) {
+ spec, schema, df := fullyPopulatedDataFile(t, 2)
+ encoded, err := codec.EncodeDataFile(df, spec, schema, 2)
+ require.NoError(t, err)
+ for _, v := range []int{0, 4, -1} {
+ _, err := codec.DecodeDataFile(encoded, spec, schema, v)
+ require.Error(t, err, "version %d must be rejected", v)
+ require.Contains(t, err.Error(), "codec: DecodeDataFile:")
+ require.Contains(t, err.Error(), "unsupported format version")
+ }
+}
+
+func TestDecodeDataFileWrapsDecodeError(t *testing.T) {
+ spec, schema, _ := fullyPopulatedDataFile(t, 2)
+ _, err := codec.DecodeDataFile([]byte{0xCA, 0xFE}, spec, schema, 2)
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "codec: DecodeDataFile:",
+ "decode-side errors from the bridge should be wrapped with the
codec entry point")
+}
+
+// TestEncodeDataFileAcceptsExternalMarshaler exercises the
+// extensibility contract: any DataFile that also implements
+// [iceberg.AvroEntryMarshaler] is accepted by the codec, not only the
+// iceberg package's built-in implementation.
+func TestEncodeDataFileAcceptsExternalMarshaler(t *testing.T) {
+ spec, schema, _ := fullyPopulatedDataFile(t, 2)
+ got, err := codec.EncodeDataFile(externalMarshaler{payload:
[]byte{0xCA, 0xFE}}, spec, schema, 2)
+ require.NoError(t, err)
+ require.Equal(t, []byte{0xCA, 0xFE}, got,
+ "the marshaler interface contract must let external impls
supply their own bytes")
+}
+
+type externalMarshaler struct {
+ stubDataFile
+ payload []byte
+}
+
+func (e externalMarshaler) MarshalAvroEntry(iceberg.PartitionSpec,
*iceberg.Schema, int) ([]byte, error) {
+ return e.payload, nil
+}
+
+func TestEncodeDataFileIdempotent(t *testing.T) {
+ spec, schema, df := fullyPopulatedDataFile(t, 2)
+ first, err := codec.EncodeDataFile(df, spec, schema, 2)
+ require.NoError(t, err)
+ second, err := codec.EncodeDataFile(df, spec, schema, 2)
+ require.NoError(t, err)
+ require.Equal(t, first, second, "repeated encodes must produce
identical bytes")
+}
+
+func TestEncodeDataFileConcurrent(t *testing.T) {
+ spec, schema, df := fullyPopulatedDataFile(t, 2)
+ want, err := codec.EncodeDataFile(df, spec, schema, 2)
+ require.NoError(t, err)
+
+ const goroutines = 16
+ const iterations = 32
+ results := make(chan []byte, goroutines*iterations)
+ errs := make(chan error, goroutines*iterations)
+ var wg sync.WaitGroup
+ for range goroutines {
+ wg.Go(func() {
+ for range iterations {
+ b, err := codec.EncodeDataFile(df, spec,
schema, 2)
+ if err != nil {
+ errs <- err
+
+ return
+ }
+ results <- b
+ }
+ })
+ }
+ wg.Wait()
+ close(results)
+ close(errs)
+
+ for err := range errs {
+ t.Fatalf("concurrent encode failed: %v", err)
+ }
+ for b := range results {
+ require.Equal(t, want, b, "concurrent encodes must produce
identical bytes")
+ }
+}
+
+func fullyPopulatedDataFile(t *testing.T, version int) (iceberg.PartitionSpec,
*iceberg.Schema, iceberg.DataFile) {
+ t.Helper()
+ schema := iceberg.NewSchema(123,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.Int64Type{}, Required: true},
+ iceberg.NestedField{ID: 2, Name: "name", Type:
iceberg.StringType{}},
+ )
+ spec := iceberg.NewPartitionSpecID(7,
+ iceberg.PartitionField{SourceIDs: []int{1}, FieldID: 1000,
Name: "id_part", Transform: iceberg.IdentityTransform{}},
+ )
+ builder, err := iceberg.NewDataFileBuilder(
+ spec,
+ iceberg.EntryContentData,
+ "s3://bucket/ns/tbl/data/part-0000.parquet",
+ iceberg.ParquetFile,
+ map[int]any{1000: int64(42)},
+ map[int]string{},
+ map[int]int{},
+ 1024,
+ 1024*1024,
+ )
+ require.NoError(t, err)
+ builder.
+ ColumnSizes(map[int]int64{1: 512, 2: 256}).
+ ValueCounts(map[int]int64{1: 1024, 2: 1024}).
+ NullValueCounts(map[int]int64{1: 0, 2: 4}).
+ NaNValueCounts(map[int]int64{1: 0, 2: 0}).
+ LowerBoundValues(map[int][]byte{1: {0x01}, 2: []byte("a")}).
+ UpperBoundValues(map[int][]byte{1: {0xff}, 2: []byte("z")}).
+ SplitOffsets([]int64{0, 4096}).
+ SortOrderID(0).
+ KeyMetadata([]byte("kms-key-1"))
+ if version < 3 {
+ // distinct_counts is deprecated for all versions in the spec
+ // (apache/iceberg#12182). The fixture sets it on v1/v2 to
+ // exercise the read-compatibility round-trip path — counts
+ // already present on a DataFile read from a legacy manifest
+ // must survive an encode→decode cycle. New DataFiles should
+ // not carry distinct counts.
+ builder.DistinctValueCounts(map[int]int64{1: 64, 2: 128})
+ }
+ if version >= 2 {
+ builder.EqualityFieldIDs([]int{1})
+ }
+ if version >= 3 {
+ builder.FirstRowID(0).
+
ReferencedDataFile("s3://bucket/ns/tbl/data/source.parquet").
+ ContentOffset(128).
+ ContentSizeInBytes(2048)
+ }
+
+ return spec, schema, builder.Build()
+}
+
+func assertDataFileEqual(t *testing.T, want, got iceberg.DataFile, version
int) {
+ t.Helper()
+ require.Equal(t, want.FilePath(), got.FilePath())
+ require.Equal(t, want.FileFormat(), got.FileFormat())
+ require.Equal(t, want.Partition(), got.Partition())
+ require.Equal(t, want.Count(), got.Count())
+ require.Equal(t, want.FileSizeBytes(), got.FileSizeBytes())
+ require.Equal(t, want.ColumnSizes(), got.ColumnSizes())
+ require.Equal(t, want.ValueCounts(), got.ValueCounts())
+ require.Equal(t, want.NullValueCounts(), got.NullValueCounts())
+ require.Equal(t, want.NaNValueCounts(), got.NaNValueCounts())
+ require.Equal(t, want.LowerBoundValues(), got.LowerBoundValues())
+ require.Equal(t, want.UpperBoundValues(), got.UpperBoundValues())
+ require.Equal(t, want.KeyMetadata(), got.KeyMetadata())
+ require.Equal(t, want.SplitOffsets(), got.SplitOffsets())
+ require.Equal(t, want.SortOrderID(), got.SortOrderID())
+ require.Equal(t, want.SpecID(), got.SpecID())
+ require.Equal(t, want.ContentType(), got.ContentType())
+ if version < 3 {
+ // distinct_counts (field 111) is deprecated for all versions
+ // but still writable on v1/v2. The codec preserves it on
+ // round-trip for read compatibility with legacy manifests; v3
+ // drops it per spec (apache/iceberg#12182).
+ require.Equal(t, want.DistinctValueCounts(),
got.DistinctValueCounts())
+ } else {
+ require.Empty(t, got.DistinctValueCounts(),
+ "v3 manifest-entry schema omits distinct_counts
(deprecated in spec); "+
+ "see internal/avro_schemas.go data_file_v3")
+ }
+ if version >= 2 {
+ require.Equal(t, want.EqualityFieldIDs(),
got.EqualityFieldIDs())
+ }
+ if version >= 3 {
+ require.Equal(t, want.FirstRowID(), got.FirstRowID())
+ require.Equal(t, want.ReferencedDataFile(),
got.ReferencedDataFile())
+ require.Equal(t, want.ContentOffset(), got.ContentOffset())
+ require.Equal(t, want.ContentSizeInBytes(),
got.ContentSizeInBytes())
+ }
+
+ require.Equal(t, expectedDataFileMethods, dataFileInterfaceMethods(),
+ "DataFile interface drifted: either extend
[codec.EncodeDataFile]/[codec.DecodeDataFile] "+
+ "and the assertions above to cover the change, or
update expectedDataFileMethods "+
+ "with a comment explaining why the new method is
intentionally not transported")
+}
+
+// expectedDataFileMethods is the sorted list of methods the
+// [iceberg.DataFile] interface is known to export. The round-trip test
+// asserts this against runtime reflection; any drift here forces a
+// deliberate decision about whether the new method needs to be
+// round-tripped by [codec.EncodeDataFile] / [codec.DecodeDataFile].
+var expectedDataFileMethods = []string{
+ "ColumnSizes",
+ "ContentOffset",
+ "ContentSizeInBytes",
+ "ContentType",
+ "Count",
+ "DistinctValueCounts",
+ "EqualityFieldIDs",
+ "FileFormat",
+ "FilePath",
+ "FileSizeBytes",
+ "FirstRowID",
+ "KeyMetadata",
+ "LowerBoundValues",
+ "NaNValueCounts",
+ "NullValueCounts",
+ "Partition",
+ "ReferencedDataFile",
+ "SortOrderID",
+ "SpecID",
+ "SplitOffsets",
+ "UpperBoundValues",
+ "ValueCounts",
+}
+
+// dataFileInterfaceMethods returns the sorted names of every method on
+// the DataFile interface. If a new method is added upstream the slice
+// changes; the round-trip test then enforces it is covered by an
+// explicit assertion above.
+func dataFileInterfaceMethods() []string {
+ t := reflect.TypeOf((*iceberg.DataFile)(nil)).Elem()
+ out := make([]string, 0, t.NumMethod())
+ for i := 0; i < t.NumMethod(); i++ {
+ out = append(out, t.Method(i).Name)
+ }
+ sort.Strings(out)
+
+ return out
+}
+
+type stubDataFile struct{}
+
+func (stubDataFile) ContentType() iceberg.ManifestEntryContent { return
iceberg.EntryContentData }
+func (stubDataFile) FilePath() string { return "" }
+func (stubDataFile) FileFormat() iceberg.FileFormat { return
iceberg.ParquetFile }
+func (stubDataFile) Partition() map[int]any { return nil }
+func (stubDataFile) Count() int64 { return 0 }
+func (stubDataFile) FileSizeBytes() int64 { return 0 }
+func (stubDataFile) ColumnSizes() map[int]int64 { return nil }
+func (stubDataFile) ValueCounts() map[int]int64 { return nil }
+func (stubDataFile) NullValueCounts() map[int]int64 { return nil }
+func (stubDataFile) NaNValueCounts() map[int]int64 { return nil }
+func (stubDataFile) DistinctValueCounts() map[int]int64 { return nil }
+func (stubDataFile) LowerBoundValues() map[int][]byte { return nil }
+func (stubDataFile) UpperBoundValues() map[int][]byte { return nil }
+func (stubDataFile) KeyMetadata() []byte { return nil }
+func (stubDataFile) SplitOffsets() []int64 { return nil }
+func (stubDataFile) EqualityFieldIDs() []int { return nil }
+func (stubDataFile) SortOrderID() *int { return nil }
+func (stubDataFile) SpecID() int32 { return 0 }
+func (stubDataFile) FirstRowID() *int64 { return nil }
+func (stubDataFile) ReferencedDataFile() *string { return nil }
+func (stubDataFile) ContentOffset() *int64 { return nil }
+func (stubDataFile) ContentSizeInBytes() *int64 { return nil }
diff --git a/codec/file_scan_task.go b/codec/file_scan_task.go
new file mode 100644
index 00000000..8d78703a
--- /dev/null
+++ b/codec/file_scan_task.go
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package codec
+
+import (
+ "fmt"
+
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/table"
+ "github.com/twmb/avro"
+)
+
+// EncodeFileScanTask encodes a FileScanTask for cross-process transport.
+// Each carried DataFile is encoded with [EncodeDataFile] and wrapped in
+// a small record that also carries the scan range and v3 row lineage.
+// The (spec, schema, version) triple must match what [DecodeFileScanTask]
+// is given on the receiver.
+//
+// All carried DataFiles (data, positional deletes, equality deletes,
+// and deletion vectors) must share the supplied spec.ID(): each delete
+// file's SpecID is validated and a mismatch returns an error. After
+// partition evolution, delete files may have been written under a
+// different partition spec than the data file; the caller is
+// responsible for partitioning the FileScanTask by per-file specID and
+// calling EncodeFileScanTask once per group.
+func EncodeFileScanTask(task table.FileScanTask, spec iceberg.PartitionSpec,
schema *iceberg.Schema, version int) ([]byte, error) {
+ if version < 1 || version > 3 {
+ return nil, fmt.Errorf("codec: EncodeFileScanTask: unsupported
format version %d", version)
+ }
+ fileBytes, err := EncodeDataFile(task.File, spec, schema, version)
+ if err != nil {
+ return nil, fmt.Errorf("file: %w", err)
+ }
+ del, err := encodeDataFileSlice(task.DeleteFiles, spec, schema, version)
+ if err != nil {
+ return nil, fmt.Errorf("delete files: %w", err)
+ }
+ eq, err := encodeDataFileSlice(task.EqualityDeleteFiles, spec, schema,
version)
+ if err != nil {
+ return nil, fmt.Errorf("equality delete files: %w", err)
+ }
+ dv, err := encodeDataFileSlice(task.DeletionVectorFiles, spec, schema,
version)
+ if err != nil {
+ return nil, fmt.Errorf("deletion vector files: %w", err)
+ }
+ envelope := fileScanTaskEnvelope{
+ File: fileBytes,
+ DeleteFiles: del,
+ EqualityDeleteFiles: eq,
+ DeletionVectorFiles: dv,
+ Start: task.Start,
+ Length: task.Length,
+ FirstRowID: task.FirstRowID,
+ DataSequenceNumber: task.DataSequenceNumber,
+ }
+
+ return fileScanTaskSchema.Encode(&envelope)
+}
+
+// DecodeFileScanTask reverses [EncodeFileScanTask]. The triple
+// (spec, schema, version) must match the encoder.
+func DecodeFileScanTask(data []byte, spec iceberg.PartitionSpec, schema
*iceberg.Schema, version int) (table.FileScanTask, error) {
+ if version < 1 || version > 3 {
+ return table.FileScanTask{}, fmt.Errorf("codec:
DecodeFileScanTask: unsupported format version %d", version)
+ }
+ var envelope fileScanTaskEnvelope
+ if _, err := fileScanTaskSchema.Decode(data, &envelope); err != nil {
+ return table.FileScanTask{}, fmt.Errorf("decode: %w", err)
+ }
+ file, err := DecodeDataFile(envelope.File, spec, schema, version)
+ if err != nil {
+ return table.FileScanTask{}, fmt.Errorf("file: %w", err)
+ }
+ del, err := decodeDataFileSlice(envelope.DeleteFiles, spec, schema,
version)
+ if err != nil {
+ return table.FileScanTask{}, fmt.Errorf("delete files: %w", err)
+ }
+ eq, err := decodeDataFileSlice(envelope.EqualityDeleteFiles, spec,
schema, version)
+ if err != nil {
+ return table.FileScanTask{}, fmt.Errorf("equality delete files:
%w", err)
+ }
+ dv, err := decodeDataFileSlice(envelope.DeletionVectorFiles, spec,
schema, version)
+ if err != nil {
+ return table.FileScanTask{}, fmt.Errorf("deletion vector files:
%w", err)
+ }
+
+ return table.FileScanTask{
+ File: file,
+ DeleteFiles: del,
+ EqualityDeleteFiles: eq,
+ DeletionVectorFiles: dv,
+ Start: envelope.Start,
+ Length: envelope.Length,
+ FirstRowID: envelope.FirstRowID,
+ DataSequenceNumber: envelope.DataSequenceNumber,
+ }, nil
+}
+
+// fileScanTaskShape is a compile-time drift guard for FileScanTask.
+// Go only permits struct conversion between two types that have
+// identical underlying field sequences (names, types, and order; tags
+// are ignored), so the var _ below fails to build the moment
+// table.FileScanTask gains, loses, renames, retypes, or reorders a
+// field. That forces a deliberate decision about whether the change
+// must be carried by [EncodeFileScanTask] / [DecodeFileScanTask]; when
+// extending, update fileScanTaskEnvelope, the schema JSON below, the
+// encode/decode bodies, and this shape together.
+type fileScanTaskShape struct {
+ File iceberg.DataFile
+ DeleteFiles []iceberg.DataFile
+ EqualityDeleteFiles []iceberg.DataFile
+ DeletionVectorFiles []iceberg.DataFile
+ Start, Length int64
+ FirstRowID *int64
+ DataSequenceNumber *int64
+}
+
+var _ = table.FileScanTask(fileScanTaskShape{})
+
+// fileScanTaskEnvelope is the avro on-wire shape. The DataFile payloads
+// (File and the three delete-file lists) are themselves [EncodeDataFile]
+// bytes; this struct only frames them along with the scan range and v3
+// lineage.
+type fileScanTaskEnvelope struct {
+ File []byte `avro:"file"`
+ DeleteFiles [][]byte `avro:"delete_files"`
+ EqualityDeleteFiles [][]byte `avro:"equality_delete_files"`
+ DeletionVectorFiles [][]byte `avro:"deletion_vector_files"`
+ Start int64 `avro:"start"`
+ Length int64 `avro:"length"`
+ FirstRowID *int64 `avro:"first_row_id"`
+ DataSequenceNumber *int64 `avro:"data_sequence_number"`
+}
+
+const fileScanTaskSchemaJSON = `{
+ "type": "record",
+ "name": "file_scan_task",
+ "fields": [
+ {"name": "file", "type": "bytes"},
+ {"name": "delete_files", "type": {"type": "array", "items": "bytes"}},
+ {"name": "equality_delete_files", "type": {"type": "array", "items":
"bytes"}},
+ {"name": "deletion_vector_files", "type": {"type": "array", "items":
"bytes"}},
+ {"name": "start", "type": "long"},
+ {"name": "length", "type": "long"},
+ {"name": "first_row_id", "type": ["null", "long"]},
+ {"name": "data_sequence_number", "type": ["null", "long"]}
+ ]
+}`
+
+var fileScanTaskSchema *avro.Schema
+
+func init() {
+ s, err := avro.Parse(fileScanTaskSchemaJSON)
+ if err != nil {
+ panic("codec: fileScanTaskSchema invalid: " + err.Error())
+ }
+ fileScanTaskSchema = s
+}
+
+func encodeDataFileSlice(files []iceberg.DataFile, spec iceberg.PartitionSpec,
schema *iceberg.Schema, version int) ([][]byte, error) {
+ if len(files) == 0 {
+ return nil, nil
+ }
+ out := make([][]byte, 0, len(files))
+ for i, f := range files {
+ if int(f.SpecID()) != spec.ID() {
+ return nil, fmt.Errorf("entry %d: data file spec id %d
does not match codec spec id %d (partition evolution requires per-spec
grouping)", i, f.SpecID(), spec.ID())
+ }
+ b, err := EncodeDataFile(f, spec, schema, version)
+ if err != nil {
+ return nil, fmt.Errorf("entry %d: %w", i, err)
+ }
+ out = append(out, b)
+ }
+
+ return out, nil
+}
+
+func decodeDataFileSlice(blobs [][]byte, spec iceberg.PartitionSpec, schema
*iceberg.Schema, version int) ([]iceberg.DataFile, error) {
+ if len(blobs) == 0 {
+ return nil, nil
+ }
+ out := make([]iceberg.DataFile, 0, len(blobs))
+ for i, b := range blobs {
+ df, err := DecodeDataFile(b, spec, schema, version)
+ if err != nil {
+ return nil, fmt.Errorf("entry %d: %w", i, err)
+ }
+ out = append(out, df)
+ }
+
+ return out, nil
+}
diff --git a/codec/file_scan_task_test.go b/codec/file_scan_task_test.go
new file mode 100644
index 00000000..8775ebd8
--- /dev/null
+++ b/codec/file_scan_task_test.go
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package codec_test
+
+import (
+ "strconv"
+ "testing"
+
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/codec"
+ "github.com/apache/iceberg-go/table"
+ "github.com/stretchr/testify/require"
+)
+
+func TestEncodeDecodeFileScanTaskRoundTrip(t *testing.T) {
+ for _, version := range []int{1, 2, 3} {
+ t.Run("v"+strconv.Itoa(version), func(t *testing.T) {
+ spec, schema, original := fullyPopulatedFileScanTask(t,
version)
+
+ bytes, err := codec.EncodeFileScanTask(original, spec,
schema, version)
+ require.NoError(t, err)
+ require.NotEmpty(t, bytes)
+
+ decoded, err := codec.DecodeFileScanTask(bytes, spec,
schema, version)
+ require.NoError(t, err)
+
+ require.Equal(t, original.File.FilePath(),
decoded.File.FilePath())
+ require.Equal(t, original.File.Count(),
decoded.File.Count())
+ require.Equal(t, original.File.Partition(),
decoded.File.Partition())
+
+ require.Len(t, decoded.DeleteFiles,
len(original.DeleteFiles))
+ for i := range original.DeleteFiles {
+ require.Equal(t,
original.DeleteFiles[i].FilePath(), decoded.DeleteFiles[i].FilePath())
+ }
+ require.Len(t, decoded.EqualityDeleteFiles,
len(original.EqualityDeleteFiles))
+ for i := range original.EqualityDeleteFiles {
+ require.Equal(t,
original.EqualityDeleteFiles[i].FilePath(),
decoded.EqualityDeleteFiles[i].FilePath())
+ }
+ require.Len(t, decoded.DeletionVectorFiles,
len(original.DeletionVectorFiles))
+ for i := range original.DeletionVectorFiles {
+ require.Equal(t,
original.DeletionVectorFiles[i].FilePath(),
decoded.DeletionVectorFiles[i].FilePath(),
+ "DV file path must round-trip")
+ require.Equal(t,
original.DeletionVectorFiles[i].ReferencedDataFile(),
decoded.DeletionVectorFiles[i].ReferencedDataFile(),
+ "DV file must remember the data file it
deletes from")
+ require.Equal(t,
original.DeletionVectorFiles[i].ContentOffset(),
decoded.DeletionVectorFiles[i].ContentOffset(),
+ "DV file content offset (puffin blob
offset) must round-trip")
+ require.Equal(t,
original.DeletionVectorFiles[i].ContentSizeInBytes(),
decoded.DeletionVectorFiles[i].ContentSizeInBytes(),
+ "DV file content size must round-trip")
+ require.Equal(t,
original.DeletionVectorFiles[i].FileFormat(),
decoded.DeletionVectorFiles[i].FileFormat(),
+ "DV file format must round-trip")
+ }
+
+ require.Equal(t, original.Start, decoded.Start)
+ require.Equal(t, original.Length, decoded.Length)
+ require.Equal(t, original.FirstRowID,
decoded.FirstRowID)
+ require.Equal(t, original.DataSequenceNumber,
decoded.DataSequenceNumber)
+ })
+ }
+}
+
+func TestEncodeFileScanTaskRejectsMismatchedDeleteFileSpec(t *testing.T) {
+ schema := iceberg.NewSchema(123,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.Int64Type{}, Required: true},
+ )
+ specMain := iceberg.NewPartitionSpecID(7,
+ iceberg.PartitionField{SourceIDs: []int{1}, FieldID: 1000,
Name: "id_part", Transform: iceberg.IdentityTransform{}},
+ )
+ specOther := iceberg.NewPartitionSpecID(99,
+ iceberg.PartitionField{SourceIDs: []int{1}, FieldID: 1000,
Name: "id_part", Transform: iceberg.IdentityTransform{}},
+ )
+
+ mainFile := newScanTaskDataFile(t, specMain, "s3://bucket/main.parquet",
+ iceberg.EntryContentData, iceberg.ParquetFile, "", 2)
+ mismatchedDelete := newScanTaskDataFile(t, specOther,
"s3://bucket/del.parquet",
+ iceberg.EntryContentPosDeletes, iceberg.ParquetFile, "", 2)
+
+ task := table.FileScanTask{
+ File: mainFile,
+ DeleteFiles: []iceberg.DataFile{mismatchedDelete},
+ }
+ _, err := codec.EncodeFileScanTask(task, specMain, schema, 2)
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "spec id",
+ "error must call out the spec-id mismatch")
+ require.Contains(t, err.Error(), "99",
+ "error must include the offending delete file's spec id")
+ require.Contains(t, err.Error(), "7",
+ "error must include the codec's spec id")
+}
+
+func TestEncodeFileScanTaskEmptyDeleteLists(t *testing.T) {
+ spec, schema, task := fullyPopulatedFileScanTask(t, 2)
+ task.DeleteFiles = nil
+ task.EqualityDeleteFiles = nil
+ task.DeletionVectorFiles = nil
+
+ bytes, err := codec.EncodeFileScanTask(task, spec, schema, 2)
+ require.NoError(t, err)
+
+ decoded, err := codec.DecodeFileScanTask(bytes, spec, schema, 2)
+ require.NoError(t, err)
+ require.Empty(t, decoded.DeleteFiles)
+ require.Empty(t, decoded.EqualityDeleteFiles)
+ require.Empty(t, decoded.DeletionVectorFiles)
+}
+
+func fullyPopulatedFileScanTask(t *testing.T, version int)
(iceberg.PartitionSpec, *iceberg.Schema, table.FileScanTask) {
+ t.Helper()
+ schema := iceberg.NewSchema(123,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.Int64Type{}, Required: true},
+ )
+ spec := iceberg.NewPartitionSpecID(7,
+ iceberg.PartitionField{SourceIDs: []int{1}, FieldID: 1000,
Name: "id_part", Transform: iceberg.IdentityTransform{}},
+ )
+
+ file := newScanTaskDataFile(t, spec,
"s3://bucket/ns/tbl/data/part-0000.parquet", iceberg.EntryContentData,
iceberg.ParquetFile, "", version)
+ delete1 := newScanTaskDataFile(t, spec,
"s3://bucket/ns/tbl/data/del-0001.parquet", iceberg.EntryContentPosDeletes,
iceberg.ParquetFile, "", version)
+
+ firstRow := int64(0)
+ dataSeq := int64(17)
+
+ task := table.FileScanTask{
+ File: file,
+ DeleteFiles: []iceberg.DataFile{delete1},
+ Start: 0,
+ Length: 1024 * 1024,
+ FirstRowID: &firstRow,
+ DataSequenceNumber: &dataSeq,
+ }
+
+ if version >= 2 {
+ eq1 := newScanTaskDataFile(t, spec,
"s3://bucket/ns/tbl/data/eq-0001.parquet", iceberg.EntryContentEqDeletes,
iceberg.ParquetFile, "", version)
+ task.EqualityDeleteFiles = []iceberg.DataFile{eq1}
+ }
+
+ if version >= 3 {
+ // Deletion vectors are v3's first-class delete mechanism: a
+ // Puffin blob that references the data file whose rows it
+ // deletes. The round-trip test asserts the DV-specific fields
+ // (file format, ReferencedDataFile, ContentOffset, ContentSize
+ // In Bytes) so a regression in the DV encode path surfaces.
+ dv := newScanTaskDataFile(t, spec,
"s3://bucket/ns/tbl/data/dv-0001.puffin",
+ iceberg.EntryContentPosDeletes, iceberg.PuffinFile,
file.FilePath(), version)
+ task.DeletionVectorFiles = []iceberg.DataFile{dv}
+ }
+
+ return spec, schema, task
+}
+
+func newScanTaskDataFile(t *testing.T, spec iceberg.PartitionSpec, path
string, content iceberg.ManifestEntryContent, format iceberg.FileFormat,
referencedDataFile string, version int) iceberg.DataFile {
+ t.Helper()
+ builder, err := iceberg.NewDataFileBuilder(
+ spec,
+ content,
+ path,
+ format,
+ map[int]any{1000: int64(42)},
+ map[int]string{},
+ map[int]int{},
+ 1024,
+ 1024*1024,
+ )
+ require.NoError(t, err)
+ builder.
+ ColumnSizes(map[int]int64{1: 512}).
+ ValueCounts(map[int]int64{1: 1024}).
+ NullValueCounts(map[int]int64{1: 0}).
+ LowerBoundValues(map[int][]byte{1: {0x01}}).
+ UpperBoundValues(map[int][]byte{1: {0xff}})
+ if content == iceberg.EntryContentEqDeletes {
+ builder.EqualityFieldIDs([]int{1})
+ }
+ if version >= 3 {
+ builder.FirstRowID(0)
+ }
+ if referencedDataFile != "" {
+ builder.ReferencedDataFile(referencedDataFile).
+ ContentOffset(128).
+ ContentSizeInBytes(2048)
+ }
+
+ return builder.Build()
+}
diff --git a/data_file_codec.go b/data_file_codec.go
new file mode 100644
index 00000000..15706b7e
--- /dev/null
+++ b/data_file_codec.go
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iceberg
+
+import (
+ "fmt"
+ "reflect"
+
+ "github.com/apache/iceberg-go/internal"
+ "github.com/apache/iceberg-go/internal/datafileavro"
+ lru "github.com/hashicorp/golang-lru/v2"
+ "github.com/twmb/avro"
+)
+
+// defaultSchemaCacheSize is the initial capacity of dataFileSchemaCache.
+// 8192 covers a few thousand active tables (each contributing 1–3 entries
+// for its partition spec × format version) which is what a long-running
+// consumer like a server-side compaction service typically sees.
+// SetSchemaCacheSize tunes this at runtime.
+const defaultSchemaCacheSize = 8192
+
+func init() {
+ if datafileavro.Unmarshal != nil {
+ panic("iceberg: datafileavro.Unmarshal already set")
+ }
+ datafileavro.Unmarshal = func(data []byte, spec, schema any, version
int) (any, error) {
+ s, ok := spec.(PartitionSpec)
+ if !ok {
+ return nil, fmt.Errorf("iceberg:
datafileavro.Unmarshal: expected PartitionSpec, got %T", spec)
+ }
+ sc, ok := schema.(*Schema)
+ if !ok {
+ return nil, fmt.Errorf("iceberg:
datafileavro.Unmarshal: expected *Schema, got %T", schema)
+ }
+
+ return unmarshalAvroDataFileEntry(data, s, sc, version)
+ }
+}
+
+// AvroEntryMarshaler is implemented by DataFile values that can be
+// encoded using the manifest-entry Avro encoding. The iceberg
+// package's built-in DataFile implementation satisfies it; external
+// implementations can also satisfy it to participate in the
+// [github.com/apache/iceberg-go/codec] DataFile codec.
+//
+// The encoded bytes are the same bytes a manifest carries for this
+// data file. Implementations must produce output that the iceberg
+// package's manifest-entry Avro decoder accepts.
+type AvroEntryMarshaler interface {
+ MarshalAvroEntry(spec PartitionSpec, schema *Schema, version int)
([]byte, error)
+}
+
+// MarshalAvroEntry encodes this DataFile as Avro bytes using the
+// manifest-entry encoding for the given partition spec, table schema
+// and format version (1, 2, or 3). The wire format is the same one a
+// manifest carries for this data file, so adding a field to the
+// underlying struct (and its avro tags) automatically extends what
+// MarshalAvroEntry transports — there is no separate wire-mirror
+// struct to keep in sync.
+//
+// MarshalAvroEntry is the iceberg-package side of the
+// [github.com/apache/iceberg-go/codec] DataFile codec; callers
+// performing cross-process transport should prefer that package's
+// high-level API.
+//
+// MarshalAvroEntry is safe to call concurrently with any other
+// reader or encoder of the same DataFile: a fresh *dataFile is
+// cloned (avro-tagged fields only) and the avro encoder reads, but
+// does not mutate, the cloned values. Pointer-typed avro fields like
+// ColSizes share their backing storage with the source; the
+// thread-safety guarantee relies on the avro encoder being
+// non-mutating.
+//
+// v1 note: the v1 manifest-entry schema has a non-nullable snapshot_id
+// field. MarshalAvroEntry writes 0 there, so v1 bytes are not usable
+// as a standalone manifest entry — they only round-trip via the
+// matching decoder.
+//
+// distinct_counts (field 111) is deprecated in the spec for all
+// versions. MarshalAvroEntry preserves any value already on the
+// source for v1 and v2 as a read-compatibility artifact; v3 omits
+// the field entirely (apache/iceberg#12182). New DataFiles should
+// not set distinct counts.
+func (d *dataFile) MarshalAvroEntry(spec PartitionSpec, schema *Schema,
version int) ([]byte, error) {
+ if version < 1 || version > 3 {
+ return nil, fmt.Errorf("iceberg: MarshalAvroEntry: unsupported
format version %d", version)
+ }
+ s, maps, err := manifestEntrySchemaFor(spec, schema, version)
+ if err != nil {
+ return nil, err
+ }
+ clone := cloneDataFileAvroFields(d)
+ clone.PartitionData = avroEncodePartitionData(d.Partition(),
maps.nameToID, maps.idToType)
+
+ return s.Encode(newEncodeEntry(version, clone))
+}
+
+// unmarshalAvroDataFileEntry decodes Avro bytes produced by
+// [(*dataFile).MarshalAvroEntry] back into a DataFile. The
+// (spec, schema, version) triple must match the encoder; passing a
+// different spec or version yields a decode error or silently
+// mis-typed partition values.
+//
+// The returned DataFile carries the partition spec id and the field-id
+// lookup tables, so Partition() and the stats accessors return id-keyed
+// maps as if the file had been read from a manifest.
+//
+// It is reachable from the [github.com/apache/iceberg-go/codec]
+// package through the [datafileavro] bridge.
+func unmarshalAvroDataFileEntry(data []byte, spec PartitionSpec, schema
*Schema, version int) (DataFile, error) {
+ if version < 1 || version > 3 {
+ return nil, fmt.Errorf("iceberg: unmarshalAvroDataFileEntry:
unsupported format version %d", version)
+ }
+ s, maps, err := manifestEntrySchemaFor(spec, schema, version)
+ if err != nil {
+ return nil, err
+ }
+ entry, df := newDecodeEntry(version)
+ if _, err := s.Decode(data, entry); err != nil {
+ return nil, fmt.Errorf("iceberg: unmarshalAvroDataFileEntry:
%w", err)
+ }
+ df.specID = int32(spec.ID())
+ df.fieldNameToID = maps.nameToID
+ df.fieldIDToLogicalType = maps.idToType
+ df.fieldIDToFixedSize = maps.idToFixedSize
+
+ return df, nil
+}
+
+// newEncodeEntry returns the right manifest-entry shape for the schema
+// version: v1's manifest_entry has a non-nullable snapshot_id and uses
+// [fallbackManifestEntry], v2/v3 use [manifestEntry] with nullable
+// pointers.
+func newEncodeEntry(version int, df *dataFile) any {
+ if version == 1 {
+ return &fallbackManifestEntry{
+ manifestEntry: manifestEntry{EntryStatus:
EntryStatusADDED, Data: df},
+ }
+ }
+
+ return &manifestEntry{EntryStatus: EntryStatusADDED, Data: df}
+}
+
+// newDecodeEntry mirrors [newEncodeEntry] for the read side: it returns
+// the pointer to pass to avro.Schema.Decode along with the pre-allocated
+// *dataFile that will be populated.
+func newDecodeEntry(version int) (any, *dataFile) {
+ df := &dataFile{}
+ if version == 1 {
+ return &fallbackManifestEntry{manifestEntry:
manifestEntry{Data: df}}, df
+ }
+
+ return &manifestEntry{Data: df}, df
+}
+
+// cloneDataFileAvroFields returns a fresh *dataFile populated with src's
+// avro-tagged fields. Internal state (sync.Once, lazy-init caches,
+// specID, the field-id lookup maps) is intentionally left at zero
+// values because the avro encoder reads only the avro-tagged fields.
+//
+// Using reflection over the tag set means a new avro-tagged field
+// upstream is auto-copied without an update here — the dataFile struct
+// remains the single source of truth for the wire shape. It also
+// sidesteps the go-vet copies-lock warning that would fire on a
+// struct-literal copy of *dataFile (it embeds sync.Once).
+//
+// Note: this is a shallow copy. Pointer-typed avro fields (ColSizes,
+// LowerBounds, etc.) share their backing storage with the source.
+// The no-mutation guarantee of MarshalAvroEntry depends on the avro
+// encoder being read-only on the values it walks; TestMarshalAvroEntry
+// DoesNotMutate asserts this end-to-end across every avro-tagged
+// field, so a future regression in the encoder surfaces in tests.
+func cloneDataFileAvroFields(src *dataFile) *dataFile {
+ out := &dataFile{}
+ srcVal := reflect.ValueOf(src).Elem()
+ outVal := reflect.ValueOf(out).Elem()
+ t := srcVal.Type()
+ for i := 0; i < t.NumField(); i++ {
+ if _, hasAvroTag := t.Field(i).Tag.Lookup("avro"); hasAvroTag {
+ outVal.Field(i).Set(srcVal.Field(i))
+ }
+ }
+
+ return out
+}
+
+// avroEncodePartitionData converts an id-keyed partition tuple (carrying
+// iceberg-typed values like Date or Decimal) into the name-keyed
+// avro-friendly map the manifest-entry schema expects. Idempotent:
+// values already in primitive form pass through unchanged.
+func avroEncodePartitionData(idKeyed map[int]any, nameToID map[string]int,
logicalTypes map[int]string) map[string]any {
+ converted := avroPartitionData(idKeyed, logicalTypes)
+ out := make(map[string]any, len(converted))
+ for name, id := range nameToID {
+ if v, ok := converted[id]; ok {
+ out[name] = v
+ }
+ }
+
+ return out
+}
+
+type dataFileFieldMaps struct {
+ nameToID map[string]int
+ idToType map[int]string
+ idToFixedSize map[int]int
+}
+
+// dataFileSchemaCacheKey identifies a cached avro schema by the
+// structural fingerprint of the partition Avro shape and the format
+// version. The fingerprint is taken from the avro schema produced by
+// [partitionTypeToAvroSchema] rather than [StructType.String]: the
+// avro shape ignores doc strings and other metadata that don't change
+// the wire format, so structurally identical specs that differ only
+// in documentation share a single cache entry.
+type dataFileSchemaCacheKey struct {
+ partAvroFingerprint string
+ version int
+}
+
+type dataFileSchemaEntry struct {
+ schema *avro.Schema
+ maps dataFileFieldMaps
+}
+
+var dataFileSchemaCache = mustNewSchemaCache(defaultSchemaCacheSize)
+
+func mustNewSchemaCache(size int) *lru.Cache[dataFileSchemaCacheKey,
*dataFileSchemaEntry] {
+ c, err := lru.New[dataFileSchemaCacheKey, *dataFileSchemaEntry](size)
+ if err != nil {
+ panic(fmt.Sprintf("iceberg: schema cache size %d invalid: %v",
size, err))
+ }
+
+ return c
+}
+
+// SetSchemaCacheSize resizes the manifest-entry schema cache used by
+// the DataFile codec. The default capacity is sized for a few thousand
+// active partition specs; long-running consumers with larger working
+// sets (e.g. a compaction service touching many tables) should raise
+// it. Existing entries are preserved on grow; on shrink, least-recently
+// used entries are evicted down to the new size. Not safe to call
+// concurrently with codec operations.
+func SetSchemaCacheSize(size int) error {
+ if size <= 0 {
+ return fmt.Errorf("iceberg: SetSchemaCacheSize: size must be
positive, got %d", size)
+ }
+ dataFileSchemaCache.Resize(size)
+
+ return nil
+}
+
+// manifestEntrySchemaFor returns the cached avro schema and partition
+// field-id lookups for the given partition type and format version.
+// The cache key fingerprints the partition Avro shape, so specs that
+// differ only in field documentation share a single entry.
+func manifestEntrySchemaFor(spec PartitionSpec, schema *Schema, version int)
(*avro.Schema, dataFileFieldMaps, error) {
+ partType := spec.PartitionType(schema)
+ partSchema, err := partitionTypeToAvroSchema(partType)
+ if err != nil {
+ return nil, dataFileFieldMaps{}, err
+ }
+ key := dataFileSchemaCacheKey{partAvroFingerprint: partSchema.String(),
version: version}
+ if cached, ok := dataFileSchemaCache.Get(key); ok {
+ return cached.schema, cached.maps, nil
+ }
+ fullSchema, err := internal.NewManifestEntrySchema(partSchema, version)
+ if err != nil {
+ return nil, dataFileFieldMaps{}, err
+ }
+ n2i, i2t, i2s := getFieldIDMap(fullSchema)
+ entry := &dataFileSchemaEntry{
+ schema: fullSchema,
+ maps: dataFileFieldMaps{
+ nameToID: n2i,
+ idToType: i2t,
+ idToFixedSize: i2s,
+ },
+ }
+ dataFileSchemaCache.Add(key, entry)
+
+ return entry.schema, entry.maps, nil
+}
diff --git a/data_file_codec_test.go b/data_file_codec_test.go
new file mode 100644
index 00000000..c277963b
--- /dev/null
+++ b/data_file_codec_test.go
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iceberg
+
+import (
+ "reflect"
+ "strconv"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestSchemaCacheLRUBoundsSize(t *testing.T) {
+ require.NoError(t, SetSchemaCacheSize(2))
+ t.Cleanup(func() { require.NoError(t,
SetSchemaCacheSize(defaultSchemaCacheSize)) })
+
+ sc := NewSchema(0,
+ NestedField{ID: 1, Name: "a", Type: Int64Type{}, Required:
true},
+ NestedField{ID: 2, Name: "b", Type: StringType{}, Required:
true},
+ NestedField{ID: 3, Name: "c", Type: BooleanType{}, Required:
true},
+ )
+ for i, src := range []int{1, 2, 3} {
+ spec := NewPartitionSpec(PartitionField{
+ SourceIDs: []int{src},
+ FieldID: 1000 + i,
+ Name: "part_" + strconv.Itoa(src),
+ Transform: IdentityTransform{},
+ })
+ _, _, err := manifestEntrySchemaFor(spec, sc, 2)
+ require.NoError(t, err)
+ }
+ require.LessOrEqual(t, dataFileSchemaCache.Len(), 2,
+ "LRU must enforce its size bound by evicting
least-recently-used entries on insert")
+}
+
+func TestSetSchemaCacheSizeRejectsNonPositive(t *testing.T) {
+ for _, size := range []int{0, -1} {
+ require.Error(t, SetSchemaCacheSize(size),
+ "size %d must be rejected", size)
+ }
+}
+
+func TestManifestEntrySchemaForCachesByPartitionAvroFingerprint(t *testing.T) {
+ spec, schema, _ := fullyPopulatedDataFileForCodec(t, 2)
+ s1, _, err := manifestEntrySchemaFor(spec, schema, 2)
+ require.NoError(t, err)
+ s2, _, err := manifestEntrySchemaFor(spec, schema, 2)
+ require.NoError(t, err)
+ require.Same(t, s1, s2,
+ "identical (partition type, version) inputs must reuse the
cached schema")
+}
+
+// TestManifestEntrySchemaForDistinguishesSameSpecIDDifferentPartitionTypes
+// guards against the cache-key collision that would occur if the cache
+// were keyed only by spec.ID(). Two tables can both use
+// InitialPartitionSpecID = 0 but expose different partition column
+// types; keying on the partition-type fingerprint keeps their schemas
+// separate.
+func
TestManifestEntrySchemaForDistinguishesSameSpecIDDifferentPartitionTypes(t
*testing.T) {
+ intSchema := NewSchema(0,
+ NestedField{ID: 1, Name: "id", Type: Int64Type{}, Required:
true},
+ )
+ intSpec := NewPartitionSpec(
+ PartitionField{SourceIDs: []int{1}, FieldID: 1000, Name:
"id_part", Transform: IdentityTransform{}},
+ )
+ require.Equal(t, InitialPartitionSpecID, intSpec.ID(),
+ "sanity: NewPartitionSpec uses InitialPartitionSpecID")
+
+ strSchema := NewSchema(0,
+ NestedField{ID: 1, Name: "name", Type: StringType{}, Required:
true},
+ )
+ strSpec := NewPartitionSpec(
+ PartitionField{SourceIDs: []int{1}, FieldID: 1000, Name:
"name_part", Transform: IdentityTransform{}},
+ )
+ require.Equal(t, intSpec.ID(), strSpec.ID(),
+ "sanity: both specs share the same spec id, exposing the
collision risk")
+
+ intAvro, intMaps, err := manifestEntrySchemaFor(intSpec, intSchema, 2)
+ require.NoError(t, err)
+ strAvro, strMaps, err := manifestEntrySchemaFor(strSpec, strSchema, 2)
+ require.NoError(t, err)
+
+ require.NotSame(t, intAvro, strAvro,
+ "different partition column types must not share a cached avro
schema")
+ require.NotEqual(t, intMaps.nameToID, strMaps.nameToID,
+ "partition field name→id maps must differ between the two
specs")
+}
+
+func TestMarshalAvroEntryDoesNotMutateAnyAvroField(t *testing.T) {
+ spec, schema, df := fullyPopulatedDataFileForCodec(t, 2)
+ impl := df.(*dataFile)
+
+ before := snapshotAvroFields(impl)
+ _, err := impl.MarshalAvroEntry(spec, schema, 2)
+ require.NoError(t, err)
+ after := snapshotAvroFields(impl)
+
+ require.Equal(t, before, after,
+ "MarshalAvroEntry must not mutate any avro-tagged field of the
source DataFile, "+
+ "including pointer-typed fields whose backing storage
is shared with the clone")
+}
+
+// snapshotAvroFields returns a deep copy of every avro-tagged field on
+// d, keyed by field name. Slices, maps, byte arrays, and pointer
+// targets are reconstructed so the snapshot is fully independent of d
+// and stays stable across subsequent mutations of d.
+func snapshotAvroFields(d *dataFile) map[string]any {
+ out := make(map[string]any)
+ v := reflect.ValueOf(d).Elem()
+ t := v.Type()
+ for i := 0; i < t.NumField(); i++ {
+ f := t.Field(i)
+ if _, ok := f.Tag.Lookup("avro"); !ok {
+ continue
+ }
+ out[f.Name] = deepCopyReflect(v.Field(i)).Interface()
+ }
+
+ return out
+}
+
+// deepCopyReflect returns a reflect.Value whose pointer/slice/map
+// descendants do not alias src. Primitives and the inner non-mutable
+// leaves (interface payloads, function values) are copied by value.
+func deepCopyReflect(src reflect.Value) reflect.Value {
+ switch src.Kind() {
+ case reflect.Pointer:
+ if src.IsNil() {
+ return src
+ }
+ dst := reflect.New(src.Elem().Type())
+ dst.Elem().Set(deepCopyReflect(src.Elem()))
+
+ return dst
+ case reflect.Slice:
+ if src.IsNil() {
+ return src
+ }
+ dst := reflect.MakeSlice(src.Type(), src.Len(), src.Len())
+ for i := 0; i < src.Len(); i++ {
+ dst.Index(i).Set(deepCopyReflect(src.Index(i)))
+ }
+
+ return dst
+ case reflect.Map:
+ if src.IsNil() {
+ return src
+ }
+ dst := reflect.MakeMapWithSize(src.Type(), src.Len())
+ iter := src.MapRange()
+ for iter.Next() {
+ dst.SetMapIndex(deepCopyReflect(iter.Key()),
deepCopyReflect(iter.Value()))
+ }
+
+ return dst
+ case reflect.Struct:
+ dst := reflect.New(src.Type()).Elem()
+ for i := 0; i < src.NumField(); i++ {
+ if dst.Field(i).CanSet() {
+ dst.Field(i).Set(deepCopyReflect(src.Field(i)))
+ }
+ }
+
+ return dst
+ default:
+ return src
+ }
+}
+
+func fullyPopulatedDataFileForCodec(t *testing.T, version int) (PartitionSpec,
*Schema, DataFile) {
+ t.Helper()
+ schema := NewSchema(123,
+ NestedField{ID: 1, Name: "id", Type: Int64Type{}, Required:
true},
+ NestedField{ID: 2, Name: "name", Type: StringType{}},
+ )
+ spec := NewPartitionSpecID(7,
+ PartitionField{SourceIDs: []int{1}, FieldID: 1000, Name:
"id_part", Transform: IdentityTransform{}},
+ )
+ builder, err := NewDataFileBuilder(
+ spec,
+ EntryContentData,
+ "s3://bucket/ns/tbl/data/part-0000.parquet",
+ ParquetFile,
+ map[int]any{1000: int64(42)},
+ map[int]string{},
+ map[int]int{},
+ 1024,
+ 1024*1024,
+ )
+ require.NoError(t, err)
+ builder.
+ ColumnSizes(map[int]int64{1: 512, 2: 256}).
+ ValueCounts(map[int]int64{1: 1024, 2: 1024}).
+ NullValueCounts(map[int]int64{1: 0, 2: 4}).
+ NaNValueCounts(map[int]int64{1: 0, 2: 0}).
+ LowerBoundValues(map[int][]byte{1: {0x01}, 2: []byte("a")}).
+ UpperBoundValues(map[int][]byte{1: {0xff}, 2: []byte("z")}).
+ SplitOffsets([]int64{0, 4096}).
+ SortOrderID(0).
+ KeyMetadata([]byte("kms-key-1"))
+ if version < 3 {
+ // distinct_counts is deprecated in the spec for all versions
+ // (apache/iceberg#12182); the fixture sets it to cover the
+ // read-compatibility round-trip path for legacy manifests, not
+ // to model what a new DataFile should carry.
+ builder.DistinctValueCounts(map[int]int64{1: 64, 2: 128})
+ }
+ if version >= 2 {
+ builder.EqualityFieldIDs([]int{1})
+ }
+ if version >= 3 {
+ builder.FirstRowID(0).
+
ReferencedDataFile("s3://bucket/ns/tbl/data/source.parquet").
+ ContentOffset(128).
+ ContentSizeInBytes(2048)
+ }
+
+ return spec, schema, builder.Build()
+}
diff --git a/go.mod b/go.mod
index 1c673c82..23ba7504 100644
--- a/go.mod
+++ b/go.mod
@@ -38,6 +38,7 @@ require (
github.com/docker/docker v28.5.2+incompatible
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
+ github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/klauspost/compress v1.18.6
github.com/pterm/pterm v0.12.83
github.com/stretchr/testify v1.11.1
diff --git a/internal/datafileavro/bridge.go b/internal/datafileavro/bridge.go
new file mode 100644
index 00000000..6971b92d
--- /dev/null
+++ b/internal/datafileavro/bridge.go
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package datafileavro is an internal bridge that lets the
+// github.com/apache/iceberg-go/codec package consume the iceberg
+// package's manifest-entry Avro decoder without iceberg having to
+// export it publicly.
+//
+// The iceberg package's init populates [Unmarshal] with a closure
+// over its unexported decode function. The codec package calls
+// [Unmarshal] through this indirection. The any-typed parameters
+// break what would otherwise be a circular import between iceberg
+// and codec.
+package datafileavro
+
+// Unmarshal is set by the iceberg package's init() function. It
+// invokes the iceberg package's unexported manifest-entry Avro
+// decoder and returns an iceberg.DataFile (typed as any to avoid an
+// import cycle); callers must type-assert it back.
+//
+// spec carries an iceberg.PartitionSpec; schema carries a
+// *iceberg.Schema. version is the Iceberg format version (1, 2, or 3).
+var Unmarshal func(data []byte, spec, schema any, version int) (any, error)