laskoviymishka commented on code in PR #1075:
URL: https://github.com/apache/iceberg-go/pull/1075#discussion_r3237373686


##########
data_file_codec.go:
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iceberg
+
+import (
+       "fmt"
+       "reflect"
+       "sync"
+
+       "github.com/apache/iceberg-go/internal"
+       "github.com/twmb/avro"
+)
+
+// MarshalAvroEntry encodes this DataFile as Avro bytes using the
+// manifest-entry encoding for the given partition spec, table schema
+// and format version (1, 2, or 3). The wire format is the same one a
+// manifest carries for this data file, so adding a field to the
+// underlying struct (and its avro tags) automatically extends what
+// MarshalAvroEntry transports — there is no separate wire-mirror
+// struct to keep in sync.
+//
+// MarshalAvroEntry is the low-level avro primitive used by the
+// [github.com/apache/iceberg-go/codec] package; callers performing
+// cross-process transport should use that package's high-level API
+// rather than calling this method directly. The receiver MUST decode
+// with [UnmarshalAvroDataFileEntry] and the matching
+// (spec, schema, version) triple.
+//
+// MarshalAvroEntry is non-mutating and safe to call concurrently with
+// any other reader or encoder of the same DataFile: it encodes a
+// shallow copy of df's avro-tagged fields, leaving df untouched.
+//
+// distinct_counts round-trips on v1 and v2. The v3 manifest-entry
+// schema omits the field (deprecated in the v3 spec, see
+// apache/iceberg#12182), so it does not survive encode→decode on v3 —
+// callers on v3 that need distinct counts must transport them
+// separately.
+func (d *dataFile) MarshalAvroEntry(spec PartitionSpec, schema *Schema, 
version int) ([]byte, error) {
+       if version < 1 || version > 3 {
+               return nil, fmt.Errorf("iceberg: MarshalAvroEntry: unsupported 
format version %d", version)
+       }
+       s, maps, err := manifestEntrySchemaFor(spec, schema, version)
+       if err != nil {
+               return nil, err
+       }
+       clone := cloneDataFileAvroFields(d)
+       clone.PartitionData = avroEncodePartitionData(d.Partition(), 
maps.nameToID, maps.idToType)
+
+       return s.Encode(newEncodeEntry(version, clone))
+}
+
+// UnmarshalAvroDataFileEntry decodes Avro bytes produced by
+// [(*dataFile).MarshalAvroEntry] back into a DataFile. The
+// (spec, schema, version) triple must match the encoder; passing a
+// different spec or version yields a decode error or silently
+// mis-typed partition values.
+//
+// UnmarshalAvroDataFileEntry is the low-level avro primitive used by
+// the [github.com/apache/iceberg-go/codec] package; callers performing
+// cross-process transport should use that package's high-level API
+// rather than calling this function directly.
+//
+// The returned DataFile carries the partition spec id and the field-id
+// lookup tables, so Partition() and the stats accessors return id-keyed
+// maps as if the file had been read from a manifest.
+func UnmarshalAvroDataFileEntry(data []byte, spec PartitionSpec, schema 
*Schema, version int) (DataFile, error) {
+       if version < 1 || version > 3 {
+               return nil, fmt.Errorf("iceberg: UnmarshalAvroDataFileEntry: 
unsupported format version %d", version)
+       }
+       s, maps, err := manifestEntrySchemaFor(spec, schema, version)
+       if err != nil {
+               return nil, err
+       }
+       entry, df := newDecodeEntry(version)
+       if _, err := s.Decode(data, entry); err != nil {
+               return nil, fmt.Errorf("iceberg: UnmarshalAvroDataFileEntry: 
%w", err)
+       }
+       df.specID = int32(spec.ID())
+       df.fieldNameToID = maps.nameToID
+       df.fieldIDToLogicalType = maps.idToType
+       df.fieldIDToFixedSize = maps.idToFixedSize
+
+       return df, nil
+}
+
+// newEncodeEntry returns the right manifest-entry shape for the schema
+// version: v1's manifest_entry has a non-nullable snapshot_id and uses
+// [fallbackManifestEntry], v2/v3 use [manifestEntry] with nullable
+// pointers.
+func newEncodeEntry(version int, df *dataFile) any {
+       if version == 1 {
+               return &fallbackManifestEntry{
+                       manifestEntry: manifestEntry{EntryStatus: 
EntryStatusADDED, Data: df},
+               }
+       }
+
+       return &manifestEntry{EntryStatus: EntryStatusADDED, Data: df}
+}
+
+// newDecodeEntry mirrors [newEncodeEntry] for the read side: it returns
+// the pointer to pass to avro.Schema.Decode along with the pre-allocated
+// *dataFile that will be populated.
+func newDecodeEntry(version int) (any, *dataFile) {
+       df := &dataFile{}
+       if version == 1 {
+               return &fallbackManifestEntry{manifestEntry: 
manifestEntry{Data: df}}, df
+       }
+
+       return &manifestEntry{Data: df}, df
+}
+
+// cloneDataFileAvroFields returns a fresh *dataFile populated with src's
+// avro-tagged fields. Internal state (sync.Once, lazy-init caches,
+// specID, the field-id lookup maps) is intentionally left at zero
+// values because the avro encoder reads only the avro-tagged fields.
+//
+// Using reflection over the tag set means a new avro-tagged field
+// upstream is auto-copied without an update here — the dataFile struct
+// remains the single source of truth for the wire shape. It also
+// sidesteps the go-vet copies-lock warning that would fire on a
+// struct-literal copy of *dataFile (it embeds sync.Once).
+func cloneDataFileAvroFields(src *dataFile) *dataFile {
+       out := &dataFile{}
+       srcVal := reflect.ValueOf(src).Elem()
+       outVal := reflect.ValueOf(out).Elem()
+       t := srcVal.Type()
+       for i := 0; i < t.NumField(); i++ {
+               if _, hasAvroTag := t.Field(i).Tag.Lookup("avro"); hasAvroTag {
+                       outVal.Field(i).Set(srcVal.Field(i))
+               }
+       }
+
+       return out
+}
+
+// avroEncodePartitionData converts an id-keyed partition tuple (carrying
+// iceberg-typed values like Date or Decimal) into the name-keyed
+// avro-friendly map the manifest-entry schema expects. Idempotent:
+// values already in primitive form pass through unchanged.
+func avroEncodePartitionData(idKeyed map[int]any, nameToID map[string]int, 
logicalTypes map[int]string) map[string]any {
+       converted := avroPartitionData(idKeyed, logicalTypes)
+       out := make(map[string]any, len(converted))
+       for name, id := range nameToID {
+               if v, ok := converted[id]; ok {
+                       out[name] = v
+               }
+       }
+
+       return out
+}
+
+type dataFileFieldMaps struct {
+       nameToID      map[string]int
+       idToType      map[int]string
+       idToFixedSize map[int]int
+}
+
+type dataFileSchemaCacheKey struct {

Review Comment:
   This cache key collides across tables. specID is per-table — 
`InitialPartitionSpecID = 0` means essentially every table created via 
`NewPartitionSpec` has specID 0. Two such tables with different partition 
column types share the same key in this process-global `sync.Map`: whichever 
encodes first wins, and the second table silently gets back the first table's 
Avro partition shape. Bytes round-trip but `Partition()` comes out mis-typed 
(e.g. `int64` decoded as `int32`) or gets dropped. The doc on 
`manifestEntrySchemaFor` even acknowledges the dependency — *"apart from how it 
shapes the partition struct"* — but the key omits the thing that shapes the 
struct.
   
   Including schema id handles the realistic case:
   
   ```go
   type dataFileSchemaCacheKey struct {
       specID   int
       schemaID int
       version  int
   }
   ```
   
   A stricter alternative is to fingerprint `spec.PartitionType(schema)`, but 
schema id is usually enough in practice. `TestManifestEntrySchemaForCaches` 
needs reworking too — as written it locks in the collision behavior. wdyt?



##########
codec/file_scan_task_test.go:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package codec_test
+
+import (
+       "strconv"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/codec"
+       "github.com/apache/iceberg-go/table"
+       "github.com/stretchr/testify/require"
+)
+
+func TestEncodeDecodeFileScanTaskRoundTrip(t *testing.T) {
+       for _, version := range []int{2, 3} {
+               t.Run("v"+strconv.Itoa(version), func(t *testing.T) {
+                       spec, schema, original := fullyPopulatedFileScanTask(t, 
version)
+
+                       bytes, err := codec.EncodeFileScanTask(original, spec, 
schema, version)
+                       require.NoError(t, err)
+                       require.NotEmpty(t, bytes)
+
+                       decoded, err := codec.DecodeFileScanTask(bytes, spec, 
schema, version)
+                       require.NoError(t, err)
+
+                       require.Equal(t, original.File.FilePath(), 
decoded.File.FilePath())
+                       require.Equal(t, original.File.Count(), 
decoded.File.Count())
+                       require.Equal(t, original.File.Partition(), 
decoded.File.Partition())
+
+                       require.Len(t, decoded.DeleteFiles, 
len(original.DeleteFiles))
+                       for i := range original.DeleteFiles {
+                               require.Equal(t, 
original.DeleteFiles[i].FilePath(), decoded.DeleteFiles[i].FilePath())
+                       }
+                       require.Len(t, decoded.EqualityDeleteFiles, 
len(original.EqualityDeleteFiles))
+                       for i := range original.EqualityDeleteFiles {
+                               require.Equal(t, 
original.EqualityDeleteFiles[i].FilePath(), 
decoded.EqualityDeleteFiles[i].FilePath())
+                       }
+                       require.Len(t, decoded.DeletionVectorFiles, 
len(original.DeletionVectorFiles))

Review Comment:
   This assertion is `0 == 0` — `fullyPopulatedFileScanTask` never sets 
`DeletionVectorFiles`. DV files are the v3 first-class delete mechanism, so the 
whole encode/decode path for them is currently unexercised. Any regression here 
passes silently.
   
   I'd add at least one DV file to the v3 fixture (`EntryContentPosDeletes` + 
`ReferencedDataFile`) and assert `FilePath()` / `ReferencedDataFile()` 
equality, mirroring the other two delete-file loops above.



##########
data_file_codec.go:
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iceberg
+
+import (
+       "fmt"
+       "reflect"
+       "sync"
+
+       "github.com/apache/iceberg-go/internal"
+       "github.com/twmb/avro"
+)
+
+// MarshalAvroEntry encodes this DataFile as Avro bytes using the
+// manifest-entry encoding for the given partition spec, table schema
+// and format version (1, 2, or 3). The wire format is the same one a
+// manifest carries for this data file, so adding a field to the
+// underlying struct (and its avro tags) automatically extends what
+// MarshalAvroEntry transports — there is no separate wire-mirror
+// struct to keep in sync.
+//
+// MarshalAvroEntry is the low-level avro primitive used by the
+// [github.com/apache/iceberg-go/codec] package; callers performing
+// cross-process transport should use that package's high-level API
+// rather than calling this method directly. The receiver MUST decode
+// with [UnmarshalAvroDataFileEntry] and the matching
+// (spec, schema, version) triple.
+//
+// MarshalAvroEntry is non-mutating and safe to call concurrently with
+// any other reader or encoder of the same DataFile: it encodes a
+// shallow copy of df's avro-tagged fields, leaving df untouched.
+//
+// distinct_counts round-trips on v1 and v2. The v3 manifest-entry
+// schema omits the field (deprecated in the v3 spec, see
+// apache/iceberg#12182), so it does not survive encode→decode on v3 —
+// callers on v3 that need distinct counts must transport them
+// separately.
+func (d *dataFile) MarshalAvroEntry(spec PartitionSpec, schema *Schema, 
version int) ([]byte, error) {
+       if version < 1 || version > 3 {
+               return nil, fmt.Errorf("iceberg: MarshalAvroEntry: unsupported 
format version %d", version)
+       }
+       s, maps, err := manifestEntrySchemaFor(spec, schema, version)
+       if err != nil {
+               return nil, err
+       }
+       clone := cloneDataFileAvroFields(d)
+       clone.PartitionData = avroEncodePartitionData(d.Partition(), 
maps.nameToID, maps.idToType)
+
+       return s.Encode(newEncodeEntry(version, clone))
+}
+
+// UnmarshalAvroDataFileEntry decodes Avro bytes produced by
+// [(*dataFile).MarshalAvroEntry] back into a DataFile. The
+// (spec, schema, version) triple must match the encoder; passing a
+// different spec or version yields a decode error or silently
+// mis-typed partition values.
+//
+// UnmarshalAvroDataFileEntry is the low-level avro primitive used by
+// the [github.com/apache/iceberg-go/codec] package; callers performing
+// cross-process transport should use that package's high-level API
+// rather than calling this function directly.
+//
+// The returned DataFile carries the partition spec id and the field-id
+// lookup tables, so Partition() and the stats accessors return id-keyed
+// maps as if the file had been read from a manifest.
+func UnmarshalAvroDataFileEntry(data []byte, spec PartitionSpec, schema 
*Schema, version int) (DataFile, error) {
+       if version < 1 || version > 3 {
+               return nil, fmt.Errorf("iceberg: UnmarshalAvroDataFileEntry: 
unsupported format version %d", version)
+       }
+       s, maps, err := manifestEntrySchemaFor(spec, schema, version)
+       if err != nil {
+               return nil, err
+       }
+       entry, df := newDecodeEntry(version)
+       if _, err := s.Decode(data, entry); err != nil {
+               return nil, fmt.Errorf("iceberg: UnmarshalAvroDataFileEntry: 
%w", err)
+       }
+       df.specID = int32(spec.ID())
+       df.fieldNameToID = maps.nameToID
+       df.fieldIDToLogicalType = maps.idToType
+       df.fieldIDToFixedSize = maps.idToFixedSize
+
+       return df, nil
+}
+
+// newEncodeEntry returns the right manifest-entry shape for the schema
+// version: v1's manifest_entry has a non-nullable snapshot_id and uses
+// [fallbackManifestEntry], v2/v3 use [manifestEntry] with nullable
+// pointers.
+func newEncodeEntry(version int, df *dataFile) any {
+       if version == 1 {

Review Comment:
   The v1 entry is built with `Snapshot: 0` (the zero value, since 
`fallbackManifestEntry.Snapshot` is a non-nullable `int64`). That's fine for 
the wire-transport use case here, but it means the bytes are not usable as a 
standalone v1 manifest entry — anyone who feeds them through a regular manifest 
reader gets snapshot id 0. Worth a doc note on `EncodeDataFile`: v1 bytes are 
only meaningful via `DecodeDataFile` and carry no snapshot context.



##########
codec/data_file.go:
##########
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package codec encodes and decodes iceberg-go values for cross-process
+// transport. The bytes it produces are the same Avro bytes a manifest
+// carries for the corresponding value, so callers transport iceberg
+// internals without inventing a parallel wire schema.
+//
+// EncodeDataFile / DecodeDataFile move a single [iceberg.DataFile]
+// using the manifest-entry encoding for a given partition spec, table
+// schema, and format version.
+//
+// EncodeFileScanTask / DecodeFileScanTask layer on top: each embedded
+// DataFile is encoded with [EncodeDataFile], then wrapped alongside the
+// scan range and v3 row lineage in a small Avro envelope.
+//
+// The receiver supplies (spec, schema, version) out of band. Both sides
+// in a distributed-processing design already hold table metadata, and
+// the per-(specID, version) avro schema is cached.
+package codec
+
+import (
+       "fmt"
+
+       "github.com/apache/iceberg-go"
+)
+
+// EncodeDataFile encodes a single DataFile for cross-process transport
+// using the manifest-entry Avro encoding for the given partition spec,
+// table schema and format version (1, 2, or 3). The wire format is the
+// same one a manifest carries for this data file. The receiver MUST
+// call [DecodeDataFile] with the matching (spec, schema, version)
+// triple.
+//
+// df must be a DataFile produced by the iceberg-go package (for
+// example via [iceberg.NewDataFileBuilder] or a manifest read).
+// Foreign implementations are rejected.
+//
+// EncodeDataFile is non-mutating and safe to call concurrently with
+// any other reader or encoder of the same DataFile.
+//
+// distinct_counts round-trips on v1 and v2. The v3 manifest-entry
+// schema omits the field (deprecated in the v3 spec, see
+// apache/iceberg#12182), so it does not survive encode→decode on v3 —
+// callers on v3 that need distinct counts must transport them
+// separately.
+func EncodeDataFile(df iceberg.DataFile, spec iceberg.PartitionSpec, schema 
*iceberg.Schema, version int) ([]byte, error) {
+       m, ok := df.(avroEntryMarshaler)

Review Comment:
   `iceberg.DataFile` is a public interface and external packages do implement 
it (test doubles, alternate file stores). Rejecting them with "requires the 
iceberg package's DataFile implementation" names the constraint but not how to 
satisfy it — the caller has no path forward short of giving up the interface.
   
   I'd either expose the marshaler interface publicly so external impls can 
satisfy it, or narrow the signature to a concrete type that only this package 
can produce. Hidden runtime constraints on a public interface tend to bite 
people who skim the signature and don't read the doc.



##########
data_file_codec.go:
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iceberg
+
+import (
+       "fmt"
+       "reflect"
+       "sync"
+
+       "github.com/apache/iceberg-go/internal"
+       "github.com/twmb/avro"
+)
+
+// MarshalAvroEntry encodes this DataFile as Avro bytes using the
+// manifest-entry encoding for the given partition spec, table schema
+// and format version (1, 2, or 3). The wire format is the same one a
+// manifest carries for this data file, so adding a field to the
+// underlying struct (and its avro tags) automatically extends what
+// MarshalAvroEntry transports — there is no separate wire-mirror
+// struct to keep in sync.
+//
+// MarshalAvroEntry is the low-level avro primitive used by the
+// [github.com/apache/iceberg-go/codec] package; callers performing
+// cross-process transport should use that package's high-level API
+// rather than calling this method directly. The receiver MUST decode
+// with [UnmarshalAvroDataFileEntry] and the matching
+// (spec, schema, version) triple.
+//
+// MarshalAvroEntry is non-mutating and safe to call concurrently with
+// any other reader or encoder of the same DataFile: it encodes a
+// shallow copy of df's avro-tagged fields, leaving df untouched.
+//
+// distinct_counts round-trips on v1 and v2. The v3 manifest-entry
+// schema omits the field (deprecated in the v3 spec, see
+// apache/iceberg#12182), so it does not survive encode→decode on v3 —
+// callers on v3 that need distinct counts must transport them
+// separately.
+func (d *dataFile) MarshalAvroEntry(spec PartitionSpec, schema *Schema, 
version int) ([]byte, error) {
+       if version < 1 || version > 3 {
+               return nil, fmt.Errorf("iceberg: MarshalAvroEntry: unsupported 
format version %d", version)
+       }
+       s, maps, err := manifestEntrySchemaFor(spec, schema, version)
+       if err != nil {
+               return nil, err
+       }
+       clone := cloneDataFileAvroFields(d)
+       clone.PartitionData = avroEncodePartitionData(d.Partition(), 
maps.nameToID, maps.idToType)
+
+       return s.Encode(newEncodeEntry(version, clone))
+}
+
+// UnmarshalAvroDataFileEntry decodes Avro bytes produced by
+// [(*dataFile).MarshalAvroEntry] back into a DataFile. The
+// (spec, schema, version) triple must match the encoder; passing a
+// different spec or version yields a decode error or silently
+// mis-typed partition values.
+//
+// UnmarshalAvroDataFileEntry is the low-level avro primitive used by
+// the [github.com/apache/iceberg-go/codec] package; callers performing
+// cross-process transport should use that package's high-level API
+// rather than calling this function directly.
+//
+// The returned DataFile carries the partition spec id and the field-id
+// lookup tables, so Partition() and the stats accessors return id-keyed
+// maps as if the file had been read from a manifest.
+func UnmarshalAvroDataFileEntry(data []byte, spec PartitionSpec, schema 
*Schema, version int) (DataFile, error) {
+       if version < 1 || version > 3 {
+               return nil, fmt.Errorf("iceberg: UnmarshalAvroDataFileEntry: 
unsupported format version %d", version)
+       }
+       s, maps, err := manifestEntrySchemaFor(spec, schema, version)
+       if err != nil {
+               return nil, err
+       }
+       entry, df := newDecodeEntry(version)
+       if _, err := s.Decode(data, entry); err != nil {
+               return nil, fmt.Errorf("iceberg: UnmarshalAvroDataFileEntry: 
%w", err)
+       }
+       df.specID = int32(spec.ID())
+       df.fieldNameToID = maps.nameToID
+       df.fieldIDToLogicalType = maps.idToType
+       df.fieldIDToFixedSize = maps.idToFixedSize
+
+       return df, nil
+}
+
+// newEncodeEntry returns the right manifest-entry shape for the schema
+// version: v1's manifest_entry has a non-nullable snapshot_id and uses
+// [fallbackManifestEntry], v2/v3 use [manifestEntry] with nullable
+// pointers.
+func newEncodeEntry(version int, df *dataFile) any {
+       if version == 1 {
+               return &fallbackManifestEntry{
+                       manifestEntry: manifestEntry{EntryStatus: 
EntryStatusADDED, Data: df},
+               }
+       }
+
+       return &manifestEntry{EntryStatus: EntryStatusADDED, Data: df}
+}
+
+// newDecodeEntry mirrors [newEncodeEntry] for the read side: it returns
+// the pointer to pass to avro.Schema.Decode along with the pre-allocated
+// *dataFile that will be populated.
+func newDecodeEntry(version int) (any, *dataFile) {
+       df := &dataFile{}
+       if version == 1 {
+               return &fallbackManifestEntry{manifestEntry: 
manifestEntry{Data: df}}, df
+       }
+
+       return &manifestEntry{Data: df}, df
+}
+
+// cloneDataFileAvroFields returns a fresh *dataFile populated with src's
+// avro-tagged fields. Internal state (sync.Once, lazy-init caches,
+// specID, the field-id lookup maps) is intentionally left at zero
+// values because the avro encoder reads only the avro-tagged fields.
+//
+// Using reflection over the tag set means a new avro-tagged field
+// upstream is auto-copied without an update here — the dataFile struct
+// remains the single source of truth for the wire shape. It also
+// sidesteps the go-vet copies-lock warning that would fire on a
+// struct-literal copy of *dataFile (it embeds sync.Once).
+func cloneDataFileAvroFields(src *dataFile) *dataFile {

Review Comment:
   This is a shallow copy of pointer-typed fields. Things like `ColSizes 
*[]colMap[int, int64]` get their pointer copied — clone and source point at the 
same backing slice. `TestMarshalAvroEntryDoesNotMutate` only checks 
`PartitionData` (which is replaced wholesale before encoding), so the 
column-stats fields aren't covered. The thread-safety claim then rests on 
`twmb/avro` never writing to the slices it's handed during encode — fragile, 
since that's library-internal.
   
   Either document the no-mutation guarantee as covering `PartitionData` only, 
or extend the test to snapshot the pointer-typed fields before/after a 
concurrent encode and assert equality. Also worth confirming 
`TestEncodeDataFileConcurrent` runs under `-race` in CI — without it, byte 
equality across goroutines doesn't actually validate the absence of a data race.



##########
codec/data_file_test.go:
##########
@@ -0,0 +1,257 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package codec_test
+
+import (
+       "reflect"
+       "sort"
+       "strconv"
+       "sync"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/codec"
+       "github.com/stretchr/testify/require"
+)
+
+func TestEncodeDecodeDataFileRoundTrip(t *testing.T) {
+       for _, version := range []int{1, 2, 3} {
+               t.Run("v"+strconv.Itoa(version), func(t *testing.T) {
+                       spec, schema, original := fullyPopulatedDataFile(t, 
version)
+
+                       bytes, err := codec.EncodeDataFile(original, spec, 
schema, version)
+                       require.NoError(t, err)
+                       require.NotEmpty(t, bytes)
+
+                       decoded, err := codec.DecodeDataFile(bytes, spec, 
schema, version)
+                       require.NoError(t, err)
+                       require.NotNil(t, decoded)
+
+                       assertDataFileEqual(t, original, decoded, version)
+               })
+       }
+}
+
+func TestEncodeDataFileRejectsForeignImpl(t *testing.T) {
+       spec, schema, _ := fullyPopulatedDataFile(t, 2)
+       _, err := codec.EncodeDataFile(stubDataFile{}, spec, schema, 2)
+       require.Error(t, err)
+       require.Contains(t, err.Error(), "DataFile implementation")
+}
+
+func TestEncodeDataFileIdempotent(t *testing.T) {
+       spec, schema, df := fullyPopulatedDataFile(t, 2)
+       first, err := codec.EncodeDataFile(df, spec, schema, 2)
+       require.NoError(t, err)
+       second, err := codec.EncodeDataFile(df, spec, schema, 2)
+       require.NoError(t, err)
+       require.Equal(t, first, second, "repeated encodes must produce 
identical bytes")
+}
+
+func TestEncodeDataFileConcurrent(t *testing.T) {
+       spec, schema, df := fullyPopulatedDataFile(t, 2)
+       want, err := codec.EncodeDataFile(df, spec, schema, 2)
+       require.NoError(t, err)
+
+       const goroutines = 16
+       const iterations = 32
+       results := make(chan []byte, goroutines*iterations)
+       errs := make(chan error, goroutines*iterations)
+       var wg sync.WaitGroup
+       for range goroutines {
+               wg.Go(func() {
+                       for range iterations {
+                               b, err := codec.EncodeDataFile(df, spec, 
schema, 2)
+                               if err != nil {
+                                       errs <- err
+
+                                       return
+                               }
+                               results <- b
+                       }
+               })
+       }
+       wg.Wait()
+       close(results)
+       close(errs)
+
+       for err := range errs {
+               t.Fatalf("concurrent encode failed: %v", err)
+       }
+       for b := range results {
+               require.Equal(t, want, b, "concurrent encodes must produce 
identical bytes")
+       }
+}
+
+func fullyPopulatedDataFile(t *testing.T, version int) (iceberg.PartitionSpec, 
*iceberg.Schema, iceberg.DataFile) {
+       t.Helper()
+       schema := iceberg.NewSchema(123,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.Int64Type{}, Required: true},
+               iceberg.NestedField{ID: 2, Name: "name", Type: 
iceberg.StringType{}},
+       )
+       spec := iceberg.NewPartitionSpecID(7,
+               iceberg.PartitionField{SourceIDs: []int{1}, FieldID: 1000, 
Name: "id_part", Transform: iceberg.IdentityTransform{}},
+       )
+       builder, err := iceberg.NewDataFileBuilder(
+               spec,
+               iceberg.EntryContentData,
+               "s3://bucket/ns/tbl/data/part-0000.parquet",
+               iceberg.ParquetFile,
+               map[int]any{1000: int64(42)},
+               map[int]string{},
+               map[int]int{},
+               1024,
+               1024*1024,
+       )
+       require.NoError(t, err)
+       builder.
+               ColumnSizes(map[int]int64{1: 512, 2: 256}).
+               ValueCounts(map[int]int64{1: 1024, 2: 1024}).
+               NullValueCounts(map[int]int64{1: 0, 2: 4}).
+               NaNValueCounts(map[int]int64{1: 0, 2: 0}).
+               LowerBoundValues(map[int][]byte{1: {0x01}, 2: []byte("a")}).
+               UpperBoundValues(map[int][]byte{1: {0xff}, 2: []byte("z")}).
+               SplitOffsets([]int64{0, 4096}).
+               SortOrderID(0).
+               KeyMetadata([]byte("kms-key-1"))
+       if version < 3 {
+               builder.DistinctValueCounts(map[int]int64{1: 64, 2: 128})

Review Comment:
   `distinct_counts` (field 111) is marked "Deprecated. Do not write." in the 
spec for all versions, not just v3. Encoded bytes never go to a manifest file 
here, so it's not a correctness bug — but framing v1/v2 distinct-count 
round-trip as a feature in this test invites future callers to set it on new 
DataFiles. I'd reframe as a read-compatibility artifact: counts already present 
on a DataFile (from an old manifest read) survive the round trip, but new 
builder usage shouldn't set them.



##########
data_file_codec.go:
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iceberg
+
+import (
+       "fmt"
+       "reflect"
+       "sync"
+
+       "github.com/apache/iceberg-go/internal"
+       "github.com/twmb/avro"
+)
+
+// MarshalAvroEntry encodes this DataFile as Avro bytes using the
+// manifest-entry encoding for the given partition spec, table schema
+// and format version (1, 2, or 3). The wire format is the same one a
+// manifest carries for this data file, so adding a field to the
+// underlying struct (and its avro tags) automatically extends what
+// MarshalAvroEntry transports — there is no separate wire-mirror
+// struct to keep in sync.
+//
+// MarshalAvroEntry is the low-level avro primitive used by the
+// [github.com/apache/iceberg-go/codec] package; callers performing
+// cross-process transport should use that package's high-level API
+// rather than calling this method directly. The receiver MUST decode
+// with [UnmarshalAvroDataFileEntry] and the matching
+// (spec, schema, version) triple.
+//
+// MarshalAvroEntry is non-mutating and safe to call concurrently with
+// any other reader or encoder of the same DataFile: it encodes a
+// shallow copy of df's avro-tagged fields, leaving df untouched.
+//
+// distinct_counts round-trips on v1 and v2. The v3 manifest-entry
+// schema omits the field (deprecated in the v3 spec, see
+// apache/iceberg#12182), so it does not survive encode→decode on v3 —
+// callers on v3 that need distinct counts must transport them
+// separately.
+func (d *dataFile) MarshalAvroEntry(spec PartitionSpec, schema *Schema, 
version int) ([]byte, error) {
+       if version < 1 || version > 3 {
+               return nil, fmt.Errorf("iceberg: MarshalAvroEntry: unsupported 
format version %d", version)
+       }
+       s, maps, err := manifestEntrySchemaFor(spec, schema, version)
+       if err != nil {
+               return nil, err
+       }
+       clone := cloneDataFileAvroFields(d)
+       clone.PartitionData = avroEncodePartitionData(d.Partition(), 
maps.nameToID, maps.idToType)
+
+       return s.Encode(newEncodeEntry(version, clone))
+}
+
+// UnmarshalAvroDataFileEntry decodes Avro bytes produced by
+// [(*dataFile).MarshalAvroEntry] back into a DataFile. The
+// (spec, schema, version) triple must match the encoder; passing a
+// different spec or version yields a decode error or silently
+// mis-typed partition values.
+//
+// UnmarshalAvroDataFileEntry is the low-level avro primitive used by
+// the [github.com/apache/iceberg-go/codec] package; callers performing
+// cross-process transport should use that package's high-level API
+// rather than calling this function directly.
+//
+// The returned DataFile carries the partition spec id and the field-id
+// lookup tables, so Partition() and the stats accessors return id-keyed
+// maps as if the file had been read from a manifest.
+func UnmarshalAvroDataFileEntry(data []byte, spec PartitionSpec, schema 
*Schema, version int) (DataFile, error) {

Review Comment:
   The encode side is `MarshalAvroEntry` (method on the unexported 
`*dataFile`); the decode side is exported as a top-level function — and the doc 
explicitly tells callers not to use it directly. If we don't want callers to 
reach this, I'd unexport it and have `codec.DecodeDataFile` use it via an 
internal bridge. Cleaner than exporting a function whose own doc says don't 
call it.



##########
codec/file_scan_task.go:
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package codec
+
+import (
+       "fmt"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/table"
+       "github.com/twmb/avro"
+)
+
+// EncodeFileScanTask encodes a FileScanTask for cross-process transport.
+// Each carried DataFile is encoded with [EncodeDataFile] and wrapped in
+// a small record that also carries the scan range and v3 row lineage.
+// The (spec, schema, version) triple must match what [DecodeFileScanTask]
+// is given on the receiver.
+func EncodeFileScanTask(task table.FileScanTask, spec iceberg.PartitionSpec, 
schema *iceberg.Schema, version int) ([]byte, error) {
+       fileBytes, err := EncodeDataFile(task.File, spec, schema, version)
+       if err != nil {
+               return nil, fmt.Errorf("file: %w", err)
+       }
+       del, err := encodeDataFileSlice(task.DeleteFiles, spec, schema, version)

Review Comment:
   All three delete-file kinds get encoded with the caller-supplied `(spec, 
schema, version)`. After partition evolution, a position-delete file can carry 
a different specID than the data file. Most deployments won't see this, but 
worth either reading specID off each `DataFile` and looking it up against table 
metadata, or at minimum a comment calling out the assumption.



##########
codec/data_file_test.go:
##########
@@ -0,0 +1,257 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package codec_test
+
+import (
+       "reflect"
+       "sort"
+       "strconv"
+       "sync"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/codec"
+       "github.com/stretchr/testify/require"
+)
+
+func TestEncodeDecodeDataFileRoundTrip(t *testing.T) {
+       for _, version := range []int{1, 2, 3} {
+               t.Run("v"+strconv.Itoa(version), func(t *testing.T) {
+                       spec, schema, original := fullyPopulatedDataFile(t, 
version)
+
+                       bytes, err := codec.EncodeDataFile(original, spec, 
schema, version)
+                       require.NoError(t, err)
+                       require.NotEmpty(t, bytes)
+
+                       decoded, err := codec.DecodeDataFile(bytes, spec, 
schema, version)
+                       require.NoError(t, err)
+                       require.NotNil(t, decoded)
+
+                       assertDataFileEqual(t, original, decoded, version)
+               })
+       }
+}
+
+func TestEncodeDataFileRejectsForeignImpl(t *testing.T) {
+       spec, schema, _ := fullyPopulatedDataFile(t, 2)
+       _, err := codec.EncodeDataFile(stubDataFile{}, spec, schema, 2)
+       require.Error(t, err)
+       require.Contains(t, err.Error(), "DataFile implementation")
+}
+
+func TestEncodeDataFileIdempotent(t *testing.T) {
+       spec, schema, df := fullyPopulatedDataFile(t, 2)
+       first, err := codec.EncodeDataFile(df, spec, schema, 2)
+       require.NoError(t, err)
+       second, err := codec.EncodeDataFile(df, spec, schema, 2)
+       require.NoError(t, err)
+       require.Equal(t, first, second, "repeated encodes must produce 
identical bytes")
+}
+
+func TestEncodeDataFileConcurrent(t *testing.T) {
+       spec, schema, df := fullyPopulatedDataFile(t, 2)
+       want, err := codec.EncodeDataFile(df, spec, schema, 2)
+       require.NoError(t, err)
+
+       const goroutines = 16
+       const iterations = 32
+       results := make(chan []byte, goroutines*iterations)
+       errs := make(chan error, goroutines*iterations)
+       var wg sync.WaitGroup
+       for range goroutines {
+               wg.Go(func() {
+                       for range iterations {
+                               b, err := codec.EncodeDataFile(df, spec, 
schema, 2)
+                               if err != nil {
+                                       errs <- err
+
+                                       return
+                               }
+                               results <- b
+                       }
+               })
+       }
+       wg.Wait()
+       close(results)
+       close(errs)
+
+       for err := range errs {
+               t.Fatalf("concurrent encode failed: %v", err)
+       }
+       for b := range results {
+               require.Equal(t, want, b, "concurrent encodes must produce 
identical bytes")
+       }
+}
+
+func fullyPopulatedDataFile(t *testing.T, version int) (iceberg.PartitionSpec, 
*iceberg.Schema, iceberg.DataFile) {
+       t.Helper()
+       schema := iceberg.NewSchema(123,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.Int64Type{}, Required: true},
+               iceberg.NestedField{ID: 2, Name: "name", Type: 
iceberg.StringType{}},
+       )
+       spec := iceberg.NewPartitionSpecID(7,
+               iceberg.PartitionField{SourceIDs: []int{1}, FieldID: 1000, 
Name: "id_part", Transform: iceberg.IdentityTransform{}},
+       )
+       builder, err := iceberg.NewDataFileBuilder(
+               spec,
+               iceberg.EntryContentData,
+               "s3://bucket/ns/tbl/data/part-0000.parquet",
+               iceberg.ParquetFile,
+               map[int]any{1000: int64(42)},
+               map[int]string{},
+               map[int]int{},
+               1024,
+               1024*1024,
+       )
+       require.NoError(t, err)
+       builder.
+               ColumnSizes(map[int]int64{1: 512, 2: 256}).
+               ValueCounts(map[int]int64{1: 1024, 2: 1024}).
+               NullValueCounts(map[int]int64{1: 0, 2: 4}).
+               NaNValueCounts(map[int]int64{1: 0, 2: 0}).
+               LowerBoundValues(map[int][]byte{1: {0x01}, 2: []byte("a")}).
+               UpperBoundValues(map[int][]byte{1: {0xff}, 2: []byte("z")}).
+               SplitOffsets([]int64{0, 4096}).
+               SortOrderID(0).
+               KeyMetadata([]byte("kms-key-1"))
+       if version < 3 {
+               builder.DistinctValueCounts(map[int]int64{1: 64, 2: 128})
+       }
+       if version >= 2 {
+               builder.EqualityFieldIDs([]int{1})
+       }
+       if version >= 3 {
+               builder.FirstRowID(0).
+                       
ReferencedDataFile("s3://bucket/ns/tbl/data/source.parquet").
+                       ContentOffset(128).
+                       ContentSizeInBytes(2048)
+       }
+
+       return spec, schema, builder.Build()
+}
+
+func assertDataFileEqual(t *testing.T, want, got iceberg.DataFile, version 
int) {
+       t.Helper()
+       require.Equal(t, want.FilePath(), got.FilePath())
+       require.Equal(t, want.FileFormat(), got.FileFormat())
+       require.Equal(t, want.Partition(), got.Partition())
+       require.Equal(t, want.Count(), got.Count())
+       require.Equal(t, want.FileSizeBytes(), got.FileSizeBytes())
+       require.Equal(t, want.ColumnSizes(), got.ColumnSizes())
+       require.Equal(t, want.ValueCounts(), got.ValueCounts())
+       require.Equal(t, want.NullValueCounts(), got.NullValueCounts())
+       require.Equal(t, want.NaNValueCounts(), got.NaNValueCounts())
+       require.Equal(t, want.LowerBoundValues(), got.LowerBoundValues())
+       require.Equal(t, want.UpperBoundValues(), got.UpperBoundValues())
+       require.Equal(t, want.KeyMetadata(), got.KeyMetadata())
+       require.Equal(t, want.SplitOffsets(), got.SplitOffsets())
+       require.Equal(t, want.SortOrderID(), got.SortOrderID())
+       require.Equal(t, want.SpecID(), got.SpecID())
+       if version < 3 {
+               require.Equal(t, want.DistinctValueCounts(), 
got.DistinctValueCounts())
+       } else {
+               require.Empty(t, got.DistinctValueCounts(),
+                       "v3 manifest-entry schema omits distinct_counts 
(deprecated in spec); "+
+                               "see internal/avro_schemas.go data_file_v3")
+       }
+       if version >= 2 {
+               require.Equal(t, want.ContentType(), got.ContentType())

Review Comment:
   `content` is an avro-tagged field on `dataFile` and lives in the v1 encoded 
bytes, but `ContentType()` is only asserted on `version >= 2`. A regression in 
v1 content-type encoding would slip past. Moving the assertion outside the gate 
is a one-line fix.



##########
codec/file_scan_task.go:
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package codec
+
+import (
+       "fmt"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/table"
+       "github.com/twmb/avro"
+)
+
+// EncodeFileScanTask encodes a FileScanTask for cross-process transport.
+// Each carried DataFile is encoded with [EncodeDataFile] and wrapped in
+// a small record that also carries the scan range and v3 row lineage.
+// The (spec, schema, version) triple must match what [DecodeFileScanTask]
+// is given on the receiver.
+func EncodeFileScanTask(task table.FileScanTask, spec iceberg.PartitionSpec, 
schema *iceberg.Schema, version int) ([]byte, error) {

Review Comment:
   `EncodeFileScanTask` and `DecodeFileScanTask` don't validate `version` 
before delegating, so an invalid value surfaces as `"file: iceberg: 
MarshalAvroEntry: unsupported format version 0"` — readable but the function 
name in the error chain doesn't match the caller's entry point. The same `if 
version < 1 || version > 3` guard at the top of both, with a `codec:`-prefixed 
error, keeps things consistent with the rest of the package.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to