This is an automated email from the ASF dual-hosted git repository.

laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new efe96b6f feat: add wire codec helpers for DataFile and FileScanTask 
(#1075)
efe96b6f is described below

commit efe96b6f826d674c9edb0e09f6558dedc524e16b
Author: Tobias Pütz <[email protected]>
AuthorDate: Wed May 20 09:54:21 2026 +0200

    feat: add wire codec helpers for DataFile and FileScanTask (#1075)
    
    Adds on-the-wire encoding for DataFile and FileScanTask to enable
    distributed compaction. With #1033, we decomposed compaction into a
    coordinator and worker portion. This enables plumbing them together over
    the wire. FileScanTask need to go coordinator -> worker (data file,
    delete files, scan range + v3 row lineage) and DataFile from worker ->
    coordinator collecting them for the commit.
    
    iceberg.EncodeDataFile / DecodeDataFile reuse the manifest-entry Avro
    encoding so the bytes on the wire are the same bytes a manifest carries.
    The dataFile struct's avro tags remain the single source of truth.
    Adding a field to DataFile extends what the helpers transport, with no
    wire mirror to keep in sync. Encoding is non-mutating and thread-safe: a
    fresh *dataFile is cloned via reflection over avro tags, so the source
    is untouched.
    
    table.EncodeFileScanTask / DecodeFileScanTask layer on top: each
    embedded DataFile is iceberg-encoded, then wrapped alongside the scan
    range and v3 row lineage in a small Avro envelope.
    
    Design notes:
    - The receiver supplies (spec, schema, version) out of band. Both sides
    in the distributed-compaction design already hold table metadata, and
    the per-(specID, version) avro schema is cached. Happy to switch to a
    self-describing payload if preferred.
    - distinct_counts round-trips on v1/v2 (thanks to #1044, which this PR
    is rebased on); v3 omits it per spec (deprecated in v3,
    apache/iceberg#12182). v3 callers that need it must transport it
    separately.
    - Reflection runs once per encode and is not in a hot path I can see.
    Happy to precompute the avro field index at init if preferred.
    - An anonymous `var _ = FileScanTask{...}` literal next to the codec is
    a compile-time drift guard: adding/retyping/reordering a FileScanTask
    field breaks the build, forcing a deliberate call on whether it must
    cross the wire.
    
    Format versions 1, 2, and 3 are supported. No change to existing
    manifest read/write paths.
    
    Tests: round-trip across v1/v2/v3 with fully populated DataFiles,
    foreign-impl rejection, partition-data idempotence, and FileScanTask
    shape including v3 row lineage.
    
    cc @laskoviymishka, this continues building blocks for distributed
    compaction
---
 .github/workflows/go-ci.yml     |   3 +
 Makefile                        |   6 +-
 codec/data_file.go              | 111 ++++++++++++++
 codec/data_file_test.go         | 321 ++++++++++++++++++++++++++++++++++++++++
 codec/file_scan_task.go         | 208 ++++++++++++++++++++++++++
 codec/file_scan_task_test.go    | 198 +++++++++++++++++++++++++
 data_file_codec.go              | 298 +++++++++++++++++++++++++++++++++++++
 data_file_codec_test.go         | 234 +++++++++++++++++++++++++++++
 go.mod                          |   1 +
 internal/datafileavro/bridge.go |  37 +++++
 10 files changed, 1416 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/go-ci.yml b/.github/workflows/go-ci.yml
index 06872065..ad782580 100644
--- a/.github/workflows/go-ci.yml
+++ b/.github/workflows/go-ci.yml
@@ -57,3 +57,6 @@ jobs:
         args: --timeout=10m
     - name: Run tests
       run: go test -v ./...
+    # Race detector is opt-in per package/test.
+    - name: Run race detector
+      run: go test -race -v ./codec/...
diff --git a/Makefile b/Makefile
index 355a4aed..9ada641c 100644
--- a/Makefile
+++ b/Makefile
@@ -17,11 +17,15 @@
 # golangci-lint version (keep in sync with CI and README)
 GOLANGCI_LINT_VERSION := v2.8.0
 
-.PHONY: test lint lint-install integration-setup integration-test 
integration-scanner integration-io integration-rest integration-spark 
integration-hadoop integration-down integration-logs docs-gen
+.PHONY: test test-race lint lint-install integration-setup integration-test 
integration-scanner integration-io integration-rest integration-spark 
integration-hadoop integration-down integration-logs docs-gen
 
 test:
        go test -v ./...
 
+# Race detector is opt-in per package/test.
+test-race:
+       go test -race -v ./codec/...
+
 docs-gen:
        go run ./website/gen
 
diff --git a/codec/data_file.go b/codec/data_file.go
new file mode 100644
index 00000000..b1f78fb1
--- /dev/null
+++ b/codec/data_file.go
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package codec encodes and decodes iceberg-go values for cross-process
+// transport. The bytes it produces are the same Avro bytes a manifest
+// carries for the corresponding value, so callers transport iceberg
+// internals without inventing a parallel wire schema.
+//
+// EncodeDataFile / DecodeDataFile move a single [iceberg.DataFile]
+// using the manifest-entry encoding for a given partition spec, table
+// schema, and format version.
+//
+// EncodeFileScanTask / DecodeFileScanTask layer on top: each embedded
+// DataFile is encoded with [EncodeDataFile], then wrapped alongside the
+// scan range and v3 row lineage in a small Avro envelope.
+//
+// The receiver supplies (spec, schema, version) out of band. Both sides
+// in a distributed-processing design already hold table metadata, and
+// the per-(partition-type, version) avro schema is cached.
+package codec
+
+import (
+       "errors"
+       "fmt"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/internal/datafileavro"
+)
+
+// EncodeDataFile encodes a single DataFile for cross-process transport
+// using the manifest-entry Avro encoding for the given partition spec,
+// table schema and format version (1, 2, or 3). The wire format is the
+// same one a manifest carries for this data file. The receiver MUST
+// call [DecodeDataFile] with the matching (spec, schema, version)
+// triple.
+//
+// df must implement [iceberg.AvroEntryMarshaler]. The iceberg
+// package's built-in DataFile implementation satisfies it; external
+// implementations of [iceberg.DataFile] can opt in by implementing
+// the marshaler interface themselves.
+//
+// EncodeDataFile is non-mutating and safe to call concurrently with
+// any other reader or encoder of the same DataFile, provided the
+// underlying implementation honors that contract.
+//
+// v1 note: v1 manifest entries carry a non-nullable snapshot_id which
+// is written as 0 by the iceberg implementation. v1 bytes are not
+// usable as a standalone manifest entry — they only round-trip via
+// [DecodeDataFile].
+//
+// distinct_counts (field 111) is deprecated in the spec for all
+// versions. Already-set values round-trip on v1 and v2 as a
+// read-compatibility artifact; v3 omits the field entirely
+// (apache/iceberg#12182). New DataFiles should not set distinct
+// counts.
+func EncodeDataFile(df iceberg.DataFile, spec iceberg.PartitionSpec, schema 
*iceberg.Schema, version int) ([]byte, error) {
+       if version < 1 || version > 3 {
+               return nil, fmt.Errorf("codec: EncodeDataFile: unsupported 
format version %d", version)
+       }
+       m, ok := df.(iceberg.AvroEntryMarshaler)
+       if !ok {
+               return nil, fmt.Errorf("codec: EncodeDataFile: requires a 
DataFile implementing iceberg.AvroEntryMarshaler, got %T", df)
+       }
+       b, err := m.MarshalAvroEntry(spec, schema, version)
+       if err != nil {
+               return nil, fmt.Errorf("codec: EncodeDataFile: %w", err)
+       }
+
+       return b, nil
+}
+
+// DecodeDataFile decodes bytes produced by [EncodeDataFile] back into a
+// DataFile. The (spec, schema, version) triple must match the encoder;
+// passing a different spec or version yields a decode error or silently
+// mis-typed partition values.
+//
+// The returned DataFile carries the partition spec id and the field-id
+// lookup tables, so Partition() and the stats accessors return id-keyed
+// maps as if the file had been read from a manifest.
+func DecodeDataFile(data []byte, spec iceberg.PartitionSpec, schema 
*iceberg.Schema, version int) (iceberg.DataFile, error) {
+       if version < 1 || version > 3 {
+               return nil, fmt.Errorf("codec: DecodeDataFile: unsupported 
format version %d", version)
+       }
+       if datafileavro.Unmarshal == nil {
+               return nil, errors.New("codec: DecodeDataFile: bridge not 
initialized; the iceberg package must be imported to register the decoder")
+       }
+       res, err := datafileavro.Unmarshal(data, spec, schema, version)
+       if err != nil {
+               return nil, fmt.Errorf("codec: DecodeDataFile: %w", err)
+       }
+       df, ok := res.(iceberg.DataFile)
+       if !ok {
+               return nil, fmt.Errorf("codec: DecodeDataFile: bridge returned 
unexpected type %T", res)
+       }
+
+       return df, nil
+}
diff --git a/codec/data_file_test.go b/codec/data_file_test.go
new file mode 100644
index 00000000..99796b4d
--- /dev/null
+++ b/codec/data_file_test.go
@@ -0,0 +1,321 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package codec_test
+
+import (
+       "reflect"
+       "sort"
+       "strconv"
+       "sync"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/codec"
+       "github.com/stretchr/testify/require"
+)
+
+func TestEncodeDecodeDataFileRoundTrip(t *testing.T) {
+       for _, version := range []int{1, 2, 3} {
+               t.Run("v"+strconv.Itoa(version), func(t *testing.T) {
+                       spec, schema, original := fullyPopulatedDataFile(t, 
version)
+
+                       bytes, err := codec.EncodeDataFile(original, spec, 
schema, version)
+                       require.NoError(t, err)
+                       require.NotEmpty(t, bytes)
+
+                       decoded, err := codec.DecodeDataFile(bytes, spec, 
schema, version)
+                       require.NoError(t, err)
+                       require.NotNil(t, decoded)
+
+                       assertDataFileEqual(t, original, decoded, version)
+               })
+       }
+}
+
+func TestEncodeDataFileRejectsForeignImpl(t *testing.T) {
+       spec, schema, _ := fullyPopulatedDataFile(t, 2)
+       _, err := codec.EncodeDataFile(stubDataFile{}, spec, schema, 2)
+       require.Error(t, err)
+       require.Contains(t, err.Error(), "iceberg.AvroEntryMarshaler",
+               "error should name the interface the caller needs to satisfy")
+       require.Contains(t, err.Error(), "codec: EncodeDataFile:",
+               "codec errors should be prefixed with the codec entry point 
name")
+}
+
+func TestEncodeDataFileRejectsBadVersion(t *testing.T) {
+       spec, schema, df := fullyPopulatedDataFile(t, 2)
+       for _, v := range []int{0, 4, -1} {
+               _, err := codec.EncodeDataFile(df, spec, schema, v)
+               require.Error(t, err, "version %d must be rejected", v)
+               require.Contains(t, err.Error(), "codec: EncodeDataFile:")
+               require.Contains(t, err.Error(), "unsupported format version")
+       }
+}
+
+func TestDecodeDataFileRejectsBadVersion(t *testing.T) {
+       spec, schema, df := fullyPopulatedDataFile(t, 2)
+       encoded, err := codec.EncodeDataFile(df, spec, schema, 2)
+       require.NoError(t, err)
+       for _, v := range []int{0, 4, -1} {
+               _, err := codec.DecodeDataFile(encoded, spec, schema, v)
+               require.Error(t, err, "version %d must be rejected", v)
+               require.Contains(t, err.Error(), "codec: DecodeDataFile:")
+               require.Contains(t, err.Error(), "unsupported format version")
+       }
+}
+
+func TestDecodeDataFileWrapsDecodeError(t *testing.T) {
+       spec, schema, _ := fullyPopulatedDataFile(t, 2)
+       _, err := codec.DecodeDataFile([]byte{0xCA, 0xFE}, spec, schema, 2)
+       require.Error(t, err)
+       require.Contains(t, err.Error(), "codec: DecodeDataFile:",
+               "decode-side errors from the bridge should be wrapped with the 
codec entry point")
+}
+
+// TestEncodeDataFileAcceptsExternalMarshaler exercises the
+// extensibility contract: any DataFile that also implements
+// [iceberg.AvroEntryMarshaler] is accepted by the codec, not only the
+// iceberg package's built-in implementation.
+func TestEncodeDataFileAcceptsExternalMarshaler(t *testing.T) {
+       spec, schema, _ := fullyPopulatedDataFile(t, 2)
+       got, err := codec.EncodeDataFile(externalMarshaler{payload: 
[]byte{0xCA, 0xFE}}, spec, schema, 2)
+       require.NoError(t, err)
+       require.Equal(t, []byte{0xCA, 0xFE}, got,
+               "the marshaler interface contract must let external impls 
supply their own bytes")
+}
+
+type externalMarshaler struct {
+       stubDataFile
+       payload []byte
+}
+
+func (e externalMarshaler) MarshalAvroEntry(iceberg.PartitionSpec, 
*iceberg.Schema, int) ([]byte, error) {
+       return e.payload, nil
+}
+
+func TestEncodeDataFileIdempotent(t *testing.T) {
+       spec, schema, df := fullyPopulatedDataFile(t, 2)
+       first, err := codec.EncodeDataFile(df, spec, schema, 2)
+       require.NoError(t, err)
+       second, err := codec.EncodeDataFile(df, spec, schema, 2)
+       require.NoError(t, err)
+       require.Equal(t, first, second, "repeated encodes must produce 
identical bytes")
+}
+
+func TestEncodeDataFileConcurrent(t *testing.T) {
+       spec, schema, df := fullyPopulatedDataFile(t, 2)
+       want, err := codec.EncodeDataFile(df, spec, schema, 2)
+       require.NoError(t, err)
+
+       const goroutines = 16
+       const iterations = 32
+       results := make(chan []byte, goroutines*iterations)
+       errs := make(chan error, goroutines*iterations)
+       var wg sync.WaitGroup
+       for range goroutines {
+               wg.Go(func() {
+                       for range iterations {
+                               b, err := codec.EncodeDataFile(df, spec, 
schema, 2)
+                               if err != nil {
+                                       errs <- err
+
+                                       return
+                               }
+                               results <- b
+                       }
+               })
+       }
+       wg.Wait()
+       close(results)
+       close(errs)
+
+       for err := range errs {
+               t.Fatalf("concurrent encode failed: %v", err)
+       }
+       for b := range results {
+               require.Equal(t, want, b, "concurrent encodes must produce 
identical bytes")
+       }
+}
+
+func fullyPopulatedDataFile(t *testing.T, version int) (iceberg.PartitionSpec, 
*iceberg.Schema, iceberg.DataFile) {
+       t.Helper()
+       schema := iceberg.NewSchema(123,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.Int64Type{}, Required: true},
+               iceberg.NestedField{ID: 2, Name: "name", Type: 
iceberg.StringType{}},
+       )
+       spec := iceberg.NewPartitionSpecID(7,
+               iceberg.PartitionField{SourceIDs: []int{1}, FieldID: 1000, 
Name: "id_part", Transform: iceberg.IdentityTransform{}},
+       )
+       builder, err := iceberg.NewDataFileBuilder(
+               spec,
+               iceberg.EntryContentData,
+               "s3://bucket/ns/tbl/data/part-0000.parquet",
+               iceberg.ParquetFile,
+               map[int]any{1000: int64(42)},
+               map[int]string{},
+               map[int]int{},
+               1024,
+               1024*1024,
+       )
+       require.NoError(t, err)
+       builder.
+               ColumnSizes(map[int]int64{1: 512, 2: 256}).
+               ValueCounts(map[int]int64{1: 1024, 2: 1024}).
+               NullValueCounts(map[int]int64{1: 0, 2: 4}).
+               NaNValueCounts(map[int]int64{1: 0, 2: 0}).
+               LowerBoundValues(map[int][]byte{1: {0x01}, 2: []byte("a")}).
+               UpperBoundValues(map[int][]byte{1: {0xff}, 2: []byte("z")}).
+               SplitOffsets([]int64{0, 4096}).
+               SortOrderID(0).
+               KeyMetadata([]byte("kms-key-1"))
+       if version < 3 {
+               // distinct_counts is deprecated for all versions in the spec
+               // (apache/iceberg#12182). The fixture sets it on v1/v2 to
+               // exercise the read-compatibility round-trip path — counts
+               // already present on a DataFile read from a legacy manifest
+               // must survive an encode→decode cycle. New DataFiles should
+               // not carry distinct counts.
+               builder.DistinctValueCounts(map[int]int64{1: 64, 2: 128})
+       }
+       if version >= 2 {
+               builder.EqualityFieldIDs([]int{1})
+       }
+       if version >= 3 {
+               builder.FirstRowID(0).
+                       
ReferencedDataFile("s3://bucket/ns/tbl/data/source.parquet").
+                       ContentOffset(128).
+                       ContentSizeInBytes(2048)
+       }
+
+       return spec, schema, builder.Build()
+}
+
+func assertDataFileEqual(t *testing.T, want, got iceberg.DataFile, version 
int) {
+       t.Helper()
+       require.Equal(t, want.FilePath(), got.FilePath())
+       require.Equal(t, want.FileFormat(), got.FileFormat())
+       require.Equal(t, want.Partition(), got.Partition())
+       require.Equal(t, want.Count(), got.Count())
+       require.Equal(t, want.FileSizeBytes(), got.FileSizeBytes())
+       require.Equal(t, want.ColumnSizes(), got.ColumnSizes())
+       require.Equal(t, want.ValueCounts(), got.ValueCounts())
+       require.Equal(t, want.NullValueCounts(), got.NullValueCounts())
+       require.Equal(t, want.NaNValueCounts(), got.NaNValueCounts())
+       require.Equal(t, want.LowerBoundValues(), got.LowerBoundValues())
+       require.Equal(t, want.UpperBoundValues(), got.UpperBoundValues())
+       require.Equal(t, want.KeyMetadata(), got.KeyMetadata())
+       require.Equal(t, want.SplitOffsets(), got.SplitOffsets())
+       require.Equal(t, want.SortOrderID(), got.SortOrderID())
+       require.Equal(t, want.SpecID(), got.SpecID())
+       require.Equal(t, want.ContentType(), got.ContentType())
+       if version < 3 {
+               // distinct_counts (field 111) is deprecated for all versions
+               // but still writable on v1/v2. The codec preserves it on
+               // round-trip for read compatibility with legacy manifests; v3
+               // drops it per spec (apache/iceberg#12182).
+               require.Equal(t, want.DistinctValueCounts(), 
got.DistinctValueCounts())
+       } else {
+               require.Empty(t, got.DistinctValueCounts(),
+                       "v3 manifest-entry schema omits distinct_counts 
(deprecated in spec); "+
+                               "see internal/avro_schemas.go data_file_v3")
+       }
+       if version >= 2 {
+               require.Equal(t, want.EqualityFieldIDs(), 
got.EqualityFieldIDs())
+       }
+       if version >= 3 {
+               require.Equal(t, want.FirstRowID(), got.FirstRowID())
+               require.Equal(t, want.ReferencedDataFile(), 
got.ReferencedDataFile())
+               require.Equal(t, want.ContentOffset(), got.ContentOffset())
+               require.Equal(t, want.ContentSizeInBytes(), 
got.ContentSizeInBytes())
+       }
+
+       require.Equal(t, expectedDataFileMethods, dataFileInterfaceMethods(),
+               "DataFile interface drifted: either extend 
[codec.EncodeDataFile]/[codec.DecodeDataFile] "+
+                       "and the assertions above to cover the change, or 
update expectedDataFileMethods "+
+                       "with a comment explaining why the new method is 
intentionally not transported")
+}
+
+// expectedDataFileMethods is the sorted list of methods the
+// [iceberg.DataFile] interface is known to export. The round-trip test
+// asserts this against runtime reflection; any drift here forces a
+// deliberate decision about whether the new method needs to be
+// round-tripped by [codec.EncodeDataFile] / [codec.DecodeDataFile].
+var expectedDataFileMethods = []string{
+       "ColumnSizes",
+       "ContentOffset",
+       "ContentSizeInBytes",
+       "ContentType",
+       "Count",
+       "DistinctValueCounts",
+       "EqualityFieldIDs",
+       "FileFormat",
+       "FilePath",
+       "FileSizeBytes",
+       "FirstRowID",
+       "KeyMetadata",
+       "LowerBoundValues",
+       "NaNValueCounts",
+       "NullValueCounts",
+       "Partition",
+       "ReferencedDataFile",
+       "SortOrderID",
+       "SpecID",
+       "SplitOffsets",
+       "UpperBoundValues",
+       "ValueCounts",
+}
+
+// dataFileInterfaceMethods returns the sorted names of every method on
+// the DataFile interface. If a new method is added upstream the slice
+// changes; the round-trip test then enforces it is covered by an
+// explicit assertion above.
+func dataFileInterfaceMethods() []string {
+       t := reflect.TypeOf((*iceberg.DataFile)(nil)).Elem()
+       out := make([]string, 0, t.NumMethod())
+       for i := 0; i < t.NumMethod(); i++ {
+               out = append(out, t.Method(i).Name)
+       }
+       sort.Strings(out)
+
+       return out
+}
+
+type stubDataFile struct{}
+
+func (stubDataFile) ContentType() iceberg.ManifestEntryContent { return 
iceberg.EntryContentData }
+func (stubDataFile) FilePath() string                          { return "" }
+func (stubDataFile) FileFormat() iceberg.FileFormat            { return 
iceberg.ParquetFile }
+func (stubDataFile) Partition() map[int]any                    { return nil }
+func (stubDataFile) Count() int64                              { return 0 }
+func (stubDataFile) FileSizeBytes() int64                      { return 0 }
+func (stubDataFile) ColumnSizes() map[int]int64                { return nil }
+func (stubDataFile) ValueCounts() map[int]int64                { return nil }
+func (stubDataFile) NullValueCounts() map[int]int64            { return nil }
+func (stubDataFile) NaNValueCounts() map[int]int64             { return nil }
+func (stubDataFile) DistinctValueCounts() map[int]int64        { return nil }
+func (stubDataFile) LowerBoundValues() map[int][]byte          { return nil }
+func (stubDataFile) UpperBoundValues() map[int][]byte          { return nil }
+func (stubDataFile) KeyMetadata() []byte                       { return nil }
+func (stubDataFile) SplitOffsets() []int64                     { return nil }
+func (stubDataFile) EqualityFieldIDs() []int                   { return nil }
+func (stubDataFile) SortOrderID() *int                         { return nil }
+func (stubDataFile) SpecID() int32                             { return 0 }
+func (stubDataFile) FirstRowID() *int64                        { return nil }
+func (stubDataFile) ReferencedDataFile() *string               { return nil }
+func (stubDataFile) ContentOffset() *int64                     { return nil }
+func (stubDataFile) ContentSizeInBytes() *int64                { return nil }
diff --git a/codec/file_scan_task.go b/codec/file_scan_task.go
new file mode 100644
index 00000000..8d78703a
--- /dev/null
+++ b/codec/file_scan_task.go
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package codec
+
+import (
+       "fmt"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/table"
+       "github.com/twmb/avro"
+)
+
+// EncodeFileScanTask encodes a FileScanTask for cross-process transport.
+// Each carried DataFile is encoded with [EncodeDataFile] and wrapped in
+// a small record that also carries the scan range and v3 row lineage.
+// The (spec, schema, version) triple must match what [DecodeFileScanTask]
+// is given on the receiver.
+//
+// All carried DataFiles (data, positional deletes, equality deletes,
+// and deletion vectors) must share the supplied spec.ID(): each delete
+// file's SpecID is validated and a mismatch returns an error. After
+// partition evolution, delete files may have been written under a
+// different partition spec than the data file; the caller is
+// responsible for partitioning the FileScanTask by per-file specID and
+// calling EncodeFileScanTask once per group.
+func EncodeFileScanTask(task table.FileScanTask, spec iceberg.PartitionSpec, 
schema *iceberg.Schema, version int) ([]byte, error) {
+       if version < 1 || version > 3 {
+               return nil, fmt.Errorf("codec: EncodeFileScanTask: unsupported 
format version %d", version)
+       }
+       fileBytes, err := EncodeDataFile(task.File, spec, schema, version)
+       if err != nil {
+               return nil, fmt.Errorf("file: %w", err)
+       }
+       del, err := encodeDataFileSlice(task.DeleteFiles, spec, schema, version)
+       if err != nil {
+               return nil, fmt.Errorf("delete files: %w", err)
+       }
+       eq, err := encodeDataFileSlice(task.EqualityDeleteFiles, spec, schema, 
version)
+       if err != nil {
+               return nil, fmt.Errorf("equality delete files: %w", err)
+       }
+       dv, err := encodeDataFileSlice(task.DeletionVectorFiles, spec, schema, 
version)
+       if err != nil {
+               return nil, fmt.Errorf("deletion vector files: %w", err)
+       }
+       envelope := fileScanTaskEnvelope{
+               File:                fileBytes,
+               DeleteFiles:         del,
+               EqualityDeleteFiles: eq,
+               DeletionVectorFiles: dv,
+               Start:               task.Start,
+               Length:              task.Length,
+               FirstRowID:          task.FirstRowID,
+               DataSequenceNumber:  task.DataSequenceNumber,
+       }
+
+       return fileScanTaskSchema.Encode(&envelope)
+}
+
+// DecodeFileScanTask reverses [EncodeFileScanTask]. The triple
+// (spec, schema, version) must match the encoder.
+func DecodeFileScanTask(data []byte, spec iceberg.PartitionSpec, schema 
*iceberg.Schema, version int) (table.FileScanTask, error) {
+       if version < 1 || version > 3 {
+               return table.FileScanTask{}, fmt.Errorf("codec: 
DecodeFileScanTask: unsupported format version %d", version)
+       }
+       var envelope fileScanTaskEnvelope
+       if _, err := fileScanTaskSchema.Decode(data, &envelope); err != nil {
+               return table.FileScanTask{}, fmt.Errorf("decode: %w", err)
+       }
+       file, err := DecodeDataFile(envelope.File, spec, schema, version)
+       if err != nil {
+               return table.FileScanTask{}, fmt.Errorf("file: %w", err)
+       }
+       del, err := decodeDataFileSlice(envelope.DeleteFiles, spec, schema, 
version)
+       if err != nil {
+               return table.FileScanTask{}, fmt.Errorf("delete files: %w", err)
+       }
+       eq, err := decodeDataFileSlice(envelope.EqualityDeleteFiles, spec, 
schema, version)
+       if err != nil {
+               return table.FileScanTask{}, fmt.Errorf("equality delete files: 
%w", err)
+       }
+       dv, err := decodeDataFileSlice(envelope.DeletionVectorFiles, spec, 
schema, version)
+       if err != nil {
+               return table.FileScanTask{}, fmt.Errorf("deletion vector files: 
%w", err)
+       }
+
+       return table.FileScanTask{
+               File:                file,
+               DeleteFiles:         del,
+               EqualityDeleteFiles: eq,
+               DeletionVectorFiles: dv,
+               Start:               envelope.Start,
+               Length:              envelope.Length,
+               FirstRowID:          envelope.FirstRowID,
+               DataSequenceNumber:  envelope.DataSequenceNumber,
+       }, nil
+}
+
+// fileScanTaskShape is a compile-time drift guard for FileScanTask.
+// Go only permits struct conversion between two types that have
+// identical underlying field sequences (names, types, and order; tags
+// are ignored), so the var _ below fails to build the moment
+// table.FileScanTask gains, loses, renames, retypes, or reorders a
+// field. That forces a deliberate decision about whether the change
+// must be carried by [EncodeFileScanTask] / [DecodeFileScanTask]; when
+// extending, update fileScanTaskEnvelope, the schema JSON below, the
+// encode/decode bodies, and this shape together.
+type fileScanTaskShape struct {
+       File                iceberg.DataFile
+       DeleteFiles         []iceberg.DataFile
+       EqualityDeleteFiles []iceberg.DataFile
+       DeletionVectorFiles []iceberg.DataFile
+       Start, Length       int64
+       FirstRowID          *int64
+       DataSequenceNumber  *int64
+}
+
+var _ = table.FileScanTask(fileScanTaskShape{})
+
+// fileScanTaskEnvelope is the avro on-wire shape. The DataFile payloads
+// (File and the three delete-file lists) are themselves [EncodeDataFile]
+// bytes; this struct only frames them along with the scan range and v3
+// lineage.
+type fileScanTaskEnvelope struct {
+       File                []byte   `avro:"file"`
+       DeleteFiles         [][]byte `avro:"delete_files"`
+       EqualityDeleteFiles [][]byte `avro:"equality_delete_files"`
+       DeletionVectorFiles [][]byte `avro:"deletion_vector_files"`
+       Start               int64    `avro:"start"`
+       Length              int64    `avro:"length"`
+       FirstRowID          *int64   `avro:"first_row_id"`
+       DataSequenceNumber  *int64   `avro:"data_sequence_number"`
+}
+
+const fileScanTaskSchemaJSON = `{
+  "type": "record",
+  "name": "file_scan_task",
+  "fields": [
+    {"name": "file", "type": "bytes"},
+    {"name": "delete_files", "type": {"type": "array", "items": "bytes"}},
+    {"name": "equality_delete_files", "type": {"type": "array", "items": 
"bytes"}},
+    {"name": "deletion_vector_files", "type": {"type": "array", "items": 
"bytes"}},
+    {"name": "start", "type": "long"},
+    {"name": "length", "type": "long"},
+    {"name": "first_row_id", "type": ["null", "long"]},
+    {"name": "data_sequence_number", "type": ["null", "long"]}
+  ]
+}`
+
+var fileScanTaskSchema *avro.Schema
+
+func init() {
+       s, err := avro.Parse(fileScanTaskSchemaJSON)
+       if err != nil {
+               panic("codec: fileScanTaskSchema invalid: " + err.Error())
+       }
+       fileScanTaskSchema = s
+}
+
+func encodeDataFileSlice(files []iceberg.DataFile, spec iceberg.PartitionSpec, 
schema *iceberg.Schema, version int) ([][]byte, error) {
+       if len(files) == 0 {
+               return nil, nil
+       }
+       out := make([][]byte, 0, len(files))
+       for i, f := range files {
+               if int(f.SpecID()) != spec.ID() {
+                       return nil, fmt.Errorf("entry %d: data file spec id %d 
does not match codec spec id %d (partition evolution requires per-spec 
grouping)", i, f.SpecID(), spec.ID())
+               }
+               b, err := EncodeDataFile(f, spec, schema, version)
+               if err != nil {
+                       return nil, fmt.Errorf("entry %d: %w", i, err)
+               }
+               out = append(out, b)
+       }
+
+       return out, nil
+}
+
+func decodeDataFileSlice(blobs [][]byte, spec iceberg.PartitionSpec, schema 
*iceberg.Schema, version int) ([]iceberg.DataFile, error) {
+       if len(blobs) == 0 {
+               return nil, nil
+       }
+       out := make([]iceberg.DataFile, 0, len(blobs))
+       for i, b := range blobs {
+               df, err := DecodeDataFile(b, spec, schema, version)
+               if err != nil {
+                       return nil, fmt.Errorf("entry %d: %w", i, err)
+               }
+               out = append(out, df)
+       }
+
+       return out, nil
+}
diff --git a/codec/file_scan_task_test.go b/codec/file_scan_task_test.go
new file mode 100644
index 00000000..8775ebd8
--- /dev/null
+++ b/codec/file_scan_task_test.go
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package codec_test
+
+import (
+       "strconv"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/codec"
+       "github.com/apache/iceberg-go/table"
+       "github.com/stretchr/testify/require"
+)
+
+func TestEncodeDecodeFileScanTaskRoundTrip(t *testing.T) {
+       for _, version := range []int{1, 2, 3} {
+               t.Run("v"+strconv.Itoa(version), func(t *testing.T) {
+                       spec, schema, original := fullyPopulatedFileScanTask(t, 
version)
+
+                       bytes, err := codec.EncodeFileScanTask(original, spec, 
schema, version)
+                       require.NoError(t, err)
+                       require.NotEmpty(t, bytes)
+
+                       decoded, err := codec.DecodeFileScanTask(bytes, spec, 
schema, version)
+                       require.NoError(t, err)
+
+                       require.Equal(t, original.File.FilePath(), 
decoded.File.FilePath())
+                       require.Equal(t, original.File.Count(), 
decoded.File.Count())
+                       require.Equal(t, original.File.Partition(), 
decoded.File.Partition())
+
+                       require.Len(t, decoded.DeleteFiles, 
len(original.DeleteFiles))
+                       for i := range original.DeleteFiles {
+                               require.Equal(t, 
original.DeleteFiles[i].FilePath(), decoded.DeleteFiles[i].FilePath())
+                       }
+                       require.Len(t, decoded.EqualityDeleteFiles, 
len(original.EqualityDeleteFiles))
+                       for i := range original.EqualityDeleteFiles {
+                               require.Equal(t, 
original.EqualityDeleteFiles[i].FilePath(), 
decoded.EqualityDeleteFiles[i].FilePath())
+                       }
+                       require.Len(t, decoded.DeletionVectorFiles, 
len(original.DeletionVectorFiles))
+                       for i := range original.DeletionVectorFiles {
+                               require.Equal(t, 
original.DeletionVectorFiles[i].FilePath(), 
decoded.DeletionVectorFiles[i].FilePath(),
+                                       "DV file path must round-trip")
+                               require.Equal(t, 
original.DeletionVectorFiles[i].ReferencedDataFile(), 
decoded.DeletionVectorFiles[i].ReferencedDataFile(),
+                                       "DV file must remember the data file it 
deletes from")
+                               require.Equal(t, 
original.DeletionVectorFiles[i].ContentOffset(), 
decoded.DeletionVectorFiles[i].ContentOffset(),
+                                       "DV file content offset (puffin blob 
offset) must round-trip")
+                               require.Equal(t, 
original.DeletionVectorFiles[i].ContentSizeInBytes(), 
decoded.DeletionVectorFiles[i].ContentSizeInBytes(),
+                                       "DV file content size must round-trip")
+                               require.Equal(t, 
original.DeletionVectorFiles[i].FileFormat(), 
decoded.DeletionVectorFiles[i].FileFormat(),
+                                       "DV file format must round-trip")
+                       }
+
+                       require.Equal(t, original.Start, decoded.Start)
+                       require.Equal(t, original.Length, decoded.Length)
+                       require.Equal(t, original.FirstRowID, 
decoded.FirstRowID)
+                       require.Equal(t, original.DataSequenceNumber, 
decoded.DataSequenceNumber)
+               })
+       }
+}
+
+func TestEncodeFileScanTaskRejectsMismatchedDeleteFileSpec(t *testing.T) {
+       schema := iceberg.NewSchema(123,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.Int64Type{}, Required: true},
+       )
+       specMain := iceberg.NewPartitionSpecID(7,
+               iceberg.PartitionField{SourceIDs: []int{1}, FieldID: 1000, 
Name: "id_part", Transform: iceberg.IdentityTransform{}},
+       )
+       specOther := iceberg.NewPartitionSpecID(99,
+               iceberg.PartitionField{SourceIDs: []int{1}, FieldID: 1000, 
Name: "id_part", Transform: iceberg.IdentityTransform{}},
+       )
+
+       mainFile := newScanTaskDataFile(t, specMain, "s3://bucket/main.parquet",
+               iceberg.EntryContentData, iceberg.ParquetFile, "", 2)
+       mismatchedDelete := newScanTaskDataFile(t, specOther, 
"s3://bucket/del.parquet",
+               iceberg.EntryContentPosDeletes, iceberg.ParquetFile, "", 2)
+
+       task := table.FileScanTask{
+               File:        mainFile,
+               DeleteFiles: []iceberg.DataFile{mismatchedDelete},
+       }
+       _, err := codec.EncodeFileScanTask(task, specMain, schema, 2)
+       require.Error(t, err)
+       require.Contains(t, err.Error(), "spec id",
+               "error must call out the spec-id mismatch")
+       require.Contains(t, err.Error(), "99",
+               "error must include the offending delete file's spec id")
+       require.Contains(t, err.Error(), "7",
+               "error must include the codec's spec id")
+}
+
+func TestEncodeFileScanTaskEmptyDeleteLists(t *testing.T) {
+       spec, schema, task := fullyPopulatedFileScanTask(t, 2)
+       task.DeleteFiles = nil
+       task.EqualityDeleteFiles = nil
+       task.DeletionVectorFiles = nil
+
+       bytes, err := codec.EncodeFileScanTask(task, spec, schema, 2)
+       require.NoError(t, err)
+
+       decoded, err := codec.DecodeFileScanTask(bytes, spec, schema, 2)
+       require.NoError(t, err)
+       require.Empty(t, decoded.DeleteFiles)
+       require.Empty(t, decoded.EqualityDeleteFiles)
+       require.Empty(t, decoded.DeletionVectorFiles)
+}
+
+func fullyPopulatedFileScanTask(t *testing.T, version int) 
(iceberg.PartitionSpec, *iceberg.Schema, table.FileScanTask) {
+       t.Helper()
+       schema := iceberg.NewSchema(123,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.Int64Type{}, Required: true},
+       )
+       spec := iceberg.NewPartitionSpecID(7,
+               iceberg.PartitionField{SourceIDs: []int{1}, FieldID: 1000, 
Name: "id_part", Transform: iceberg.IdentityTransform{}},
+       )
+
+       file := newScanTaskDataFile(t, spec, 
"s3://bucket/ns/tbl/data/part-0000.parquet", iceberg.EntryContentData, 
iceberg.ParquetFile, "", version)
+       delete1 := newScanTaskDataFile(t, spec, 
"s3://bucket/ns/tbl/data/del-0001.parquet", iceberg.EntryContentPosDeletes, 
iceberg.ParquetFile, "", version)
+
+       firstRow := int64(0)
+       dataSeq := int64(17)
+
+       task := table.FileScanTask{
+               File:               file,
+               DeleteFiles:        []iceberg.DataFile{delete1},
+               Start:              0,
+               Length:             1024 * 1024,
+               FirstRowID:         &firstRow,
+               DataSequenceNumber: &dataSeq,
+       }
+
+       if version >= 2 {
+               eq1 := newScanTaskDataFile(t, spec, 
"s3://bucket/ns/tbl/data/eq-0001.parquet", iceberg.EntryContentEqDeletes, 
iceberg.ParquetFile, "", version)
+               task.EqualityDeleteFiles = []iceberg.DataFile{eq1}
+       }
+
+       if version >= 3 {
+               // Deletion vectors are v3's first-class delete mechanism: a
+               // Puffin blob that references the data file whose rows it
+               // deletes. The round-trip test asserts the DV-specific fields
+               // (file format, ReferencedDataFile, ContentOffset, ContentSize
+               // In Bytes) so a regression in the DV encode path surfaces.
+               dv := newScanTaskDataFile(t, spec, 
"s3://bucket/ns/tbl/data/dv-0001.puffin",
+                       iceberg.EntryContentPosDeletes, iceberg.PuffinFile, 
file.FilePath(), version)
+               task.DeletionVectorFiles = []iceberg.DataFile{dv}
+       }
+
+       return spec, schema, task
+}
+
+func newScanTaskDataFile(t *testing.T, spec iceberg.PartitionSpec, path 
string, content iceberg.ManifestEntryContent, format iceberg.FileFormat, 
referencedDataFile string, version int) iceberg.DataFile {
+       t.Helper()
+       builder, err := iceberg.NewDataFileBuilder(
+               spec,
+               content,
+               path,
+               format,
+               map[int]any{1000: int64(42)},
+               map[int]string{},
+               map[int]int{},
+               1024,
+               1024*1024,
+       )
+       require.NoError(t, err)
+       builder.
+               ColumnSizes(map[int]int64{1: 512}).
+               ValueCounts(map[int]int64{1: 1024}).
+               NullValueCounts(map[int]int64{1: 0}).
+               LowerBoundValues(map[int][]byte{1: {0x01}}).
+               UpperBoundValues(map[int][]byte{1: {0xff}})
+       if content == iceberg.EntryContentEqDeletes {
+               builder.EqualityFieldIDs([]int{1})
+       }
+       if version >= 3 {
+               builder.FirstRowID(0)
+       }
+       if referencedDataFile != "" {
+               builder.ReferencedDataFile(referencedDataFile).
+                       ContentOffset(128).
+                       ContentSizeInBytes(2048)
+       }
+
+       return builder.Build()
+}
diff --git a/data_file_codec.go b/data_file_codec.go
new file mode 100644
index 00000000..15706b7e
--- /dev/null
+++ b/data_file_codec.go
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iceberg
+
+import (
+       "fmt"
+       "reflect"
+
+       "github.com/apache/iceberg-go/internal"
+       "github.com/apache/iceberg-go/internal/datafileavro"
+       lru "github.com/hashicorp/golang-lru/v2"
+       "github.com/twmb/avro"
+)
+
+// defaultSchemaCacheSize is the initial capacity of dataFileSchemaCache.
+// 8192 covers a few thousand active tables (each contributing 1–3 entries
+// for its partition spec × format version) which is what a long-running
+// consumer like a server-side compaction service typically sees.
+// SetSchemaCacheSize tunes this at runtime.
+const defaultSchemaCacheSize = 8192
+
+func init() {
+       if datafileavro.Unmarshal != nil {
+               panic("iceberg: datafileavro.Unmarshal already set")
+       }
+       datafileavro.Unmarshal = func(data []byte, spec, schema any, version 
int) (any, error) {
+               s, ok := spec.(PartitionSpec)
+               if !ok {
+                       return nil, fmt.Errorf("iceberg: 
datafileavro.Unmarshal: expected PartitionSpec, got %T", spec)
+               }
+               sc, ok := schema.(*Schema)
+               if !ok {
+                       return nil, fmt.Errorf("iceberg: 
datafileavro.Unmarshal: expected *Schema, got %T", schema)
+               }
+
+               return unmarshalAvroDataFileEntry(data, s, sc, version)
+       }
+}
+
+// AvroEntryMarshaler is implemented by DataFile values that can be
+// encoded using the manifest-entry Avro encoding. The iceberg
+// package's built-in DataFile implementation satisfies it; external
+// implementations can also satisfy it to participate in the
+// [github.com/apache/iceberg-go/codec] DataFile codec.
+//
+// The encoded bytes are the same bytes a manifest carries for this
+// data file. Implementations must produce output that the iceberg
+// package's manifest-entry Avro decoder accepts.
+type AvroEntryMarshaler interface {
+       MarshalAvroEntry(spec PartitionSpec, schema *Schema, version int) 
([]byte, error)
+}
+
+// MarshalAvroEntry encodes this DataFile as Avro bytes using the
+// manifest-entry encoding for the given partition spec, table schema
+// and format version (1, 2, or 3). The wire format is the same one a
+// manifest carries for this data file, so adding a field to the
+// underlying struct (and its avro tags) automatically extends what
+// MarshalAvroEntry transports — there is no separate wire-mirror
+// struct to keep in sync.
+//
+// MarshalAvroEntry is the iceberg-package side of the
+// [github.com/apache/iceberg-go/codec] DataFile codec; callers
+// performing cross-process transport should prefer that package's
+// high-level API.
+//
+// MarshalAvroEntry is safe to call concurrently with any other
+// reader or encoder of the same DataFile: a fresh *dataFile is
+// cloned (avro-tagged fields only) and the avro encoder reads, but
+// does not mutate, the cloned values. Pointer-typed avro fields like
+// ColSizes share their backing storage with the source; the
+// thread-safety guarantee relies on the avro encoder being
+// non-mutating.
+//
+// v1 note: the v1 manifest-entry schema has a non-nullable snapshot_id
+// field. MarshalAvroEntry writes 0 there, so v1 bytes are not usable
+// as a standalone manifest entry — they only round-trip via the
+// matching decoder.
+//
+// distinct_counts (field 111) is deprecated in the spec for all
+// versions. MarshalAvroEntry preserves any value already on the
+// source for v1 and v2 as a read-compatibility artifact; v3 omits
+// the field entirely (apache/iceberg#12182). New DataFiles should
+// not set distinct counts.
+func (d *dataFile) MarshalAvroEntry(spec PartitionSpec, schema *Schema, 
version int) ([]byte, error) {
+       if version < 1 || version > 3 {
+               return nil, fmt.Errorf("iceberg: MarshalAvroEntry: unsupported 
format version %d", version)
+       }
+       s, maps, err := manifestEntrySchemaFor(spec, schema, version)
+       if err != nil {
+               return nil, err
+       }
+       clone := cloneDataFileAvroFields(d)
+       clone.PartitionData = avroEncodePartitionData(d.Partition(), 
maps.nameToID, maps.idToType)
+
+       return s.Encode(newEncodeEntry(version, clone))
+}
+
+// unmarshalAvroDataFileEntry decodes Avro bytes produced by
+// [(*dataFile).MarshalAvroEntry] back into a DataFile. The
+// (spec, schema, version) triple must match the encoder; passing a
+// different spec or version yields a decode error or silently
+// mis-typed partition values.
+//
+// The returned DataFile carries the partition spec id and the field-id
+// lookup tables, so Partition() and the stats accessors return id-keyed
+// maps as if the file had been read from a manifest.
+//
+// It is reachable from the [github.com/apache/iceberg-go/codec]
+// package through the [datafileavro] bridge.
+func unmarshalAvroDataFileEntry(data []byte, spec PartitionSpec, schema 
*Schema, version int) (DataFile, error) {
+       if version < 1 || version > 3 {
+               return nil, fmt.Errorf("iceberg: unmarshalAvroDataFileEntry: 
unsupported format version %d", version)
+       }
+       s, maps, err := manifestEntrySchemaFor(spec, schema, version)
+       if err != nil {
+               return nil, err
+       }
+       entry, df := newDecodeEntry(version)
+       if _, err := s.Decode(data, entry); err != nil {
+               return nil, fmt.Errorf("iceberg: unmarshalAvroDataFileEntry: 
%w", err)
+       }
+       df.specID = int32(spec.ID())
+       df.fieldNameToID = maps.nameToID
+       df.fieldIDToLogicalType = maps.idToType
+       df.fieldIDToFixedSize = maps.idToFixedSize
+
+       return df, nil
+}
+
+// newEncodeEntry returns the right manifest-entry shape for the schema
+// version: v1's manifest_entry has a non-nullable snapshot_id and uses
+// [fallbackManifestEntry], v2/v3 use [manifestEntry] with nullable
+// pointers.
+func newEncodeEntry(version int, df *dataFile) any {
+       if version == 1 {
+               return &fallbackManifestEntry{
+                       manifestEntry: manifestEntry{EntryStatus: 
EntryStatusADDED, Data: df},
+               }
+       }
+
+       return &manifestEntry{EntryStatus: EntryStatusADDED, Data: df}
+}
+
+// newDecodeEntry mirrors [newEncodeEntry] for the read side: it returns
+// the pointer to pass to avro.Schema.Decode along with the pre-allocated
+// *dataFile that will be populated.
+func newDecodeEntry(version int) (any, *dataFile) {
+       df := &dataFile{}
+       if version == 1 {
+               return &fallbackManifestEntry{manifestEntry: 
manifestEntry{Data: df}}, df
+       }
+
+       return &manifestEntry{Data: df}, df
+}
+
+// cloneDataFileAvroFields returns a fresh *dataFile populated with src's
+// avro-tagged fields. Internal state (sync.Once, lazy-init caches,
+// specID, the field-id lookup maps) is intentionally left at zero
+// values because the avro encoder reads only the avro-tagged fields.
+//
+// Using reflection over the tag set means a new avro-tagged field
+// upstream is auto-copied without an update here — the dataFile struct
+// remains the single source of truth for the wire shape. It also
+// sidesteps the go-vet copies-lock warning that would fire on a
+// struct-literal copy of *dataFile (it embeds sync.Once).
+//
+// Note: this is a shallow copy. Pointer-typed avro fields (ColSizes,
+// LowerBounds, etc.) share their backing storage with the source.
+// The no-mutation guarantee of MarshalAvroEntry depends on the avro
+// encoder being read-only on the values it walks; TestMarshalAvroEntry
+// DoesNotMutate asserts this end-to-end across every avro-tagged
+// field, so a future regression in the encoder surfaces in tests.
+func cloneDataFileAvroFields(src *dataFile) *dataFile {
+       out := &dataFile{}
+       srcVal := reflect.ValueOf(src).Elem()
+       outVal := reflect.ValueOf(out).Elem()
+       t := srcVal.Type()
+       for i := 0; i < t.NumField(); i++ {
+               if _, hasAvroTag := t.Field(i).Tag.Lookup("avro"); hasAvroTag {
+                       outVal.Field(i).Set(srcVal.Field(i))
+               }
+       }
+
+       return out
+}
+
+// avroEncodePartitionData converts an id-keyed partition tuple (carrying
+// iceberg-typed values like Date or Decimal) into the name-keyed
+// avro-friendly map the manifest-entry schema expects. Idempotent:
+// values already in primitive form pass through unchanged.
+func avroEncodePartitionData(idKeyed map[int]any, nameToID map[string]int, 
logicalTypes map[int]string) map[string]any {
+       converted := avroPartitionData(idKeyed, logicalTypes)
+       out := make(map[string]any, len(converted))
+       for name, id := range nameToID {
+               if v, ok := converted[id]; ok {
+                       out[name] = v
+               }
+       }
+
+       return out
+}
+
+type dataFileFieldMaps struct {
+       nameToID      map[string]int
+       idToType      map[int]string
+       idToFixedSize map[int]int
+}
+
+// dataFileSchemaCacheKey identifies a cached avro schema by the
+// structural fingerprint of the partition Avro shape and the format
+// version. The fingerprint is taken from the avro schema produced by
+// [partitionTypeToAvroSchema] rather than [StructType.String]: the
+// avro shape ignores doc strings and other metadata that don't change
+// the wire format, so structurally identical specs that differ only
+// in documentation share a single cache entry.
+type dataFileSchemaCacheKey struct {
+       partAvroFingerprint string
+       version             int
+}
+
+type dataFileSchemaEntry struct {
+       schema *avro.Schema
+       maps   dataFileFieldMaps
+}
+
+var dataFileSchemaCache = mustNewSchemaCache(defaultSchemaCacheSize)
+
+func mustNewSchemaCache(size int) *lru.Cache[dataFileSchemaCacheKey, 
*dataFileSchemaEntry] {
+       c, err := lru.New[dataFileSchemaCacheKey, *dataFileSchemaEntry](size)
+       if err != nil {
+               panic(fmt.Sprintf("iceberg: schema cache size %d invalid: %v", 
size, err))
+       }
+
+       return c
+}
+
+// SetSchemaCacheSize resizes the manifest-entry schema cache used by
+// the DataFile codec. The default capacity is sized for a few thousand
+// active partition specs; long-running consumers with larger working
+// sets (e.g. a compaction service touching many tables) should raise
+// it. Existing entries are preserved on grow; on shrink, least-recently
+// used entries are evicted down to the new size. Not safe to call
+// concurrently with codec operations.
+func SetSchemaCacheSize(size int) error {
+       if size <= 0 {
+               return fmt.Errorf("iceberg: SetSchemaCacheSize: size must be 
positive, got %d", size)
+       }
+       dataFileSchemaCache.Resize(size)
+
+       return nil
+}
+
+// manifestEntrySchemaFor returns the cached avro schema and partition
+// field-id lookups for the given partition type and format version.
+// The cache key fingerprints the partition Avro shape, so specs that
+// differ only in field documentation share a single entry.
+func manifestEntrySchemaFor(spec PartitionSpec, schema *Schema, version int) 
(*avro.Schema, dataFileFieldMaps, error) {
+       partType := spec.PartitionType(schema)
+       partSchema, err := partitionTypeToAvroSchema(partType)
+       if err != nil {
+               return nil, dataFileFieldMaps{}, err
+       }
+       key := dataFileSchemaCacheKey{partAvroFingerprint: partSchema.String(), 
version: version}
+       if cached, ok := dataFileSchemaCache.Get(key); ok {
+               return cached.schema, cached.maps, nil
+       }
+       fullSchema, err := internal.NewManifestEntrySchema(partSchema, version)
+       if err != nil {
+               return nil, dataFileFieldMaps{}, err
+       }
+       n2i, i2t, i2s := getFieldIDMap(fullSchema)
+       entry := &dataFileSchemaEntry{
+               schema: fullSchema,
+               maps: dataFileFieldMaps{
+                       nameToID:      n2i,
+                       idToType:      i2t,
+                       idToFixedSize: i2s,
+               },
+       }
+       dataFileSchemaCache.Add(key, entry)
+
+       return entry.schema, entry.maps, nil
+}
diff --git a/data_file_codec_test.go b/data_file_codec_test.go
new file mode 100644
index 00000000..c277963b
--- /dev/null
+++ b/data_file_codec_test.go
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iceberg
+
+import (
+       "reflect"
+       "strconv"
+       "testing"
+
+       "github.com/stretchr/testify/require"
+)
+
+func TestSchemaCacheLRUBoundsSize(t *testing.T) {
+       require.NoError(t, SetSchemaCacheSize(2))
+       t.Cleanup(func() { require.NoError(t, 
SetSchemaCacheSize(defaultSchemaCacheSize)) })
+
+       sc := NewSchema(0,
+               NestedField{ID: 1, Name: "a", Type: Int64Type{}, Required: 
true},
+               NestedField{ID: 2, Name: "b", Type: StringType{}, Required: 
true},
+               NestedField{ID: 3, Name: "c", Type: BooleanType{}, Required: 
true},
+       )
+       for i, src := range []int{1, 2, 3} {
+               spec := NewPartitionSpec(PartitionField{
+                       SourceIDs: []int{src},
+                       FieldID:   1000 + i,
+                       Name:      "part_" + strconv.Itoa(src),
+                       Transform: IdentityTransform{},
+               })
+               _, _, err := manifestEntrySchemaFor(spec, sc, 2)
+               require.NoError(t, err)
+       }
+       require.LessOrEqual(t, dataFileSchemaCache.Len(), 2,
+               "LRU must enforce its size bound by evicting 
least-recently-used entries on insert")
+}
+
+func TestSetSchemaCacheSizeRejectsNonPositive(t *testing.T) {
+       for _, size := range []int{0, -1} {
+               require.Error(t, SetSchemaCacheSize(size),
+                       "size %d must be rejected", size)
+       }
+}
+
+func TestManifestEntrySchemaForCachesByPartitionAvroFingerprint(t *testing.T) {
+       spec, schema, _ := fullyPopulatedDataFileForCodec(t, 2)
+       s1, _, err := manifestEntrySchemaFor(spec, schema, 2)
+       require.NoError(t, err)
+       s2, _, err := manifestEntrySchemaFor(spec, schema, 2)
+       require.NoError(t, err)
+       require.Same(t, s1, s2,
+               "identical (partition type, version) inputs must reuse the 
cached schema")
+}
+
+// TestManifestEntrySchemaForDistinguishesSameSpecIDDifferentPartitionTypes
+// guards against the cache-key collision that would occur if the cache
+// were keyed only by spec.ID(). Two tables can both use
+// InitialPartitionSpecID = 0 but expose different partition column
+// types; keying on the partition-type fingerprint keeps their schemas
+// separate.
+func 
TestManifestEntrySchemaForDistinguishesSameSpecIDDifferentPartitionTypes(t 
*testing.T) {
+       intSchema := NewSchema(0,
+               NestedField{ID: 1, Name: "id", Type: Int64Type{}, Required: 
true},
+       )
+       intSpec := NewPartitionSpec(
+               PartitionField{SourceIDs: []int{1}, FieldID: 1000, Name: 
"id_part", Transform: IdentityTransform{}},
+       )
+       require.Equal(t, InitialPartitionSpecID, intSpec.ID(),
+               "sanity: NewPartitionSpec uses InitialPartitionSpecID")
+
+       strSchema := NewSchema(0,
+               NestedField{ID: 1, Name: "name", Type: StringType{}, Required: 
true},
+       )
+       strSpec := NewPartitionSpec(
+               PartitionField{SourceIDs: []int{1}, FieldID: 1000, Name: 
"name_part", Transform: IdentityTransform{}},
+       )
+       require.Equal(t, intSpec.ID(), strSpec.ID(),
+               "sanity: both specs share the same spec id, exposing the 
collision risk")
+
+       intAvro, intMaps, err := manifestEntrySchemaFor(intSpec, intSchema, 2)
+       require.NoError(t, err)
+       strAvro, strMaps, err := manifestEntrySchemaFor(strSpec, strSchema, 2)
+       require.NoError(t, err)
+
+       require.NotSame(t, intAvro, strAvro,
+               "different partition column types must not share a cached avro 
schema")
+       require.NotEqual(t, intMaps.nameToID, strMaps.nameToID,
+               "partition field name→id maps must differ between the two 
specs")
+}
+
+func TestMarshalAvroEntryDoesNotMutateAnyAvroField(t *testing.T) {
+       spec, schema, df := fullyPopulatedDataFileForCodec(t, 2)
+       impl := df.(*dataFile)
+
+       before := snapshotAvroFields(impl)
+       _, err := impl.MarshalAvroEntry(spec, schema, 2)
+       require.NoError(t, err)
+       after := snapshotAvroFields(impl)
+
+       require.Equal(t, before, after,
+               "MarshalAvroEntry must not mutate any avro-tagged field of the 
source DataFile, "+
+                       "including pointer-typed fields whose backing storage 
is shared with the clone")
+}
+
+// snapshotAvroFields returns a deep copy of every avro-tagged field on
+// d, keyed by field name. Slices, maps, byte arrays, and pointer
+// targets are reconstructed so the snapshot is fully independent of d
+// and stays stable across subsequent mutations of d.
+func snapshotAvroFields(d *dataFile) map[string]any {
+       out := make(map[string]any)
+       v := reflect.ValueOf(d).Elem()
+       t := v.Type()
+       for i := 0; i < t.NumField(); i++ {
+               f := t.Field(i)
+               if _, ok := f.Tag.Lookup("avro"); !ok {
+                       continue
+               }
+               out[f.Name] = deepCopyReflect(v.Field(i)).Interface()
+       }
+
+       return out
+}
+
+// deepCopyReflect returns a reflect.Value whose pointer/slice/map
+// descendants do not alias src. Primitives and the inner non-mutable
+// leaves (interface payloads, function values) are copied by value.
+func deepCopyReflect(src reflect.Value) reflect.Value {
+       switch src.Kind() {
+       case reflect.Pointer:
+               if src.IsNil() {
+                       return src
+               }
+               dst := reflect.New(src.Elem().Type())
+               dst.Elem().Set(deepCopyReflect(src.Elem()))
+
+               return dst
+       case reflect.Slice:
+               if src.IsNil() {
+                       return src
+               }
+               dst := reflect.MakeSlice(src.Type(), src.Len(), src.Len())
+               for i := 0; i < src.Len(); i++ {
+                       dst.Index(i).Set(deepCopyReflect(src.Index(i)))
+               }
+
+               return dst
+       case reflect.Map:
+               if src.IsNil() {
+                       return src
+               }
+               dst := reflect.MakeMapWithSize(src.Type(), src.Len())
+               iter := src.MapRange()
+               for iter.Next() {
+                       dst.SetMapIndex(deepCopyReflect(iter.Key()), 
deepCopyReflect(iter.Value()))
+               }
+
+               return dst
+       case reflect.Struct:
+               dst := reflect.New(src.Type()).Elem()
+               for i := 0; i < src.NumField(); i++ {
+                       if dst.Field(i).CanSet() {
+                               dst.Field(i).Set(deepCopyReflect(src.Field(i)))
+                       }
+               }
+
+               return dst
+       default:
+               return src
+       }
+}
+
+func fullyPopulatedDataFileForCodec(t *testing.T, version int) (PartitionSpec, 
*Schema, DataFile) {
+       t.Helper()
+       schema := NewSchema(123,
+               NestedField{ID: 1, Name: "id", Type: Int64Type{}, Required: 
true},
+               NestedField{ID: 2, Name: "name", Type: StringType{}},
+       )
+       spec := NewPartitionSpecID(7,
+               PartitionField{SourceIDs: []int{1}, FieldID: 1000, Name: 
"id_part", Transform: IdentityTransform{}},
+       )
+       builder, err := NewDataFileBuilder(
+               spec,
+               EntryContentData,
+               "s3://bucket/ns/tbl/data/part-0000.parquet",
+               ParquetFile,
+               map[int]any{1000: int64(42)},
+               map[int]string{},
+               map[int]int{},
+               1024,
+               1024*1024,
+       )
+       require.NoError(t, err)
+       builder.
+               ColumnSizes(map[int]int64{1: 512, 2: 256}).
+               ValueCounts(map[int]int64{1: 1024, 2: 1024}).
+               NullValueCounts(map[int]int64{1: 0, 2: 4}).
+               NaNValueCounts(map[int]int64{1: 0, 2: 0}).
+               LowerBoundValues(map[int][]byte{1: {0x01}, 2: []byte("a")}).
+               UpperBoundValues(map[int][]byte{1: {0xff}, 2: []byte("z")}).
+               SplitOffsets([]int64{0, 4096}).
+               SortOrderID(0).
+               KeyMetadata([]byte("kms-key-1"))
+       if version < 3 {
+               // distinct_counts is deprecated in the spec for all versions
+               // (apache/iceberg#12182); the fixture sets it to cover the
+               // read-compatibility round-trip path for legacy manifests, not
+               // to model what a new DataFile should carry.
+               builder.DistinctValueCounts(map[int]int64{1: 64, 2: 128})
+       }
+       if version >= 2 {
+               builder.EqualityFieldIDs([]int{1})
+       }
+       if version >= 3 {
+               builder.FirstRowID(0).
+                       
ReferencedDataFile("s3://bucket/ns/tbl/data/source.parquet").
+                       ContentOffset(128).
+                       ContentSizeInBytes(2048)
+       }
+
+       return spec, schema, builder.Build()
+}
diff --git a/go.mod b/go.mod
index 1c673c82..23ba7504 100644
--- a/go.mod
+++ b/go.mod
@@ -38,6 +38,7 @@ require (
        github.com/docker/docker v28.5.2+incompatible
        github.com/google/go-cmp v0.7.0
        github.com/google/uuid v1.6.0
+       github.com/hashicorp/golang-lru/v2 v2.0.7
        github.com/klauspost/compress v1.18.6
        github.com/pterm/pterm v0.12.83
        github.com/stretchr/testify v1.11.1
diff --git a/internal/datafileavro/bridge.go b/internal/datafileavro/bridge.go
new file mode 100644
index 00000000..6971b92d
--- /dev/null
+++ b/internal/datafileavro/bridge.go
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package datafileavro is an internal bridge that lets the
+// github.com/apache/iceberg-go/codec package consume the iceberg
+// package's manifest-entry Avro decoder without iceberg having to
+// export it publicly.
+//
+// The iceberg package's init populates [Unmarshal] with a closure
+// over its unexported decode function. The codec package calls
+// [Unmarshal] through this indirection. The any-typed parameters
+// break what would otherwise be a circular import between iceberg
+// and codec.
+package datafileavro
+
+// Unmarshal is set by the iceberg package's init() function. It
+// invokes the iceberg package's unexported manifest-entry Avro
+// decoder and returns an iceberg.DataFile (typed as any to avoid an
+// import cycle); callers must type-assert it back.
+//
+// spec carries an iceberg.PartitionSpec; schema carries a
+// *iceberg.Schema. version is the Iceberg format version (1, 2, or 3).
+var Unmarshal func(data []byte, spec, schema any, version int) (any, error)

Reply via email to