laskoviymishka commented on code in PR #1075: URL: https://github.com/apache/iceberg-go/pull/1075#discussion_r3260191663
########## codec/data_file.go: ########## @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package codec encodes and decodes iceberg-go values for cross-process +// transport. The bytes it produces are the same Avro bytes a manifest +// carries for the corresponding value, so callers transport iceberg +// internals without inventing a parallel wire schema. +// +// EncodeDataFile / DecodeDataFile move a single [iceberg.DataFile] +// using the manifest-entry encoding for a given partition spec, table +// schema, and format version. +// +// EncodeFileScanTask / DecodeFileScanTask layer on top: each embedded +// DataFile is encoded with [EncodeDataFile], then wrapped alongside the +// scan range and v3 row lineage in a small Avro envelope. +// +// The receiver supplies (spec, schema, version) out of band. Both sides +// in a distributed-processing design already hold table metadata, and +// the per-(partition-type, version) avro schema is cached. +package codec + +import ( + "fmt" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/internal/datafileavro" +) + +// EncodeDataFile encodes a single DataFile for cross-process transport +// using the manifest-entry Avro encoding for the given partition spec, +// table schema and format version (1, 2, or 3). The wire format is the +// same one a manifest carries for this data file. The receiver MUST +// call [DecodeDataFile] with the matching (spec, schema, version) +// triple. +// +// df must implement [iceberg.AvroEntryMarshaler]. The iceberg +// package's built-in DataFile implementation satisfies it; external +// implementations of [iceberg.DataFile] can opt in by implementing +// the marshaler interface themselves. +// +// EncodeDataFile is non-mutating and safe to call concurrently with +// any other reader or encoder of the same DataFile, provided the +// underlying implementation honors that contract. +// +// v1 note: v1 manifest entries carry a non-nullable snapshot_id which +// is written as 0 by the iceberg implementation. v1 bytes are not +// usable as a standalone manifest entry — they only round-trip via +// [DecodeDataFile]. +// +// distinct_counts (field 111) is deprecated in the spec for all +// versions. Already-set values round-trip on v1 and v2 as a +// read-compatibility artifact; v3 omits the field entirely +// (apache/iceberg#12182). New DataFiles should not set distinct +// counts. +func EncodeDataFile(df iceberg.DataFile, spec iceberg.PartitionSpec, schema *iceberg.Schema, version int) ([]byte, error) { + m, ok := df.(iceberg.AvroEntryMarshaler) + if !ok { + return nil, fmt.Errorf("codec: EncodeDataFile requires a DataFile implementing iceberg.AvroEntryMarshaler, got %T", df) + } + + return m.MarshalAvroEntry(spec, schema, version) +} + +// DecodeDataFile decodes bytes produced by [EncodeDataFile] back into a +// DataFile. The (spec, schema, version) triple must match the encoder; +// passing a different spec or version yields a decode error or silently +// mis-typed partition values. +// +// The returned DataFile carries the partition spec id and the field-id +// lookup tables, so Partition() and the stats accessors return id-keyed +// maps as if the file had been read from a manifest. +func DecodeDataFile(data []byte, spec iceberg.PartitionSpec, schema *iceberg.Schema, version int) (iceberg.DataFile, error) { + res, err := datafileavro.Unmarshal(data, spec, schema, version) Review Comment: Two robustness items at this call site. First, `datafileavro.Unmarshal` is a bare package variable; if a test binary ever imports `codec` without indirectly triggering `iceberg.init()`, the call panics with a nil function pointer and no diagnostics. A `if datafileavro.Unmarshal == nil` guard would make the init-order dependency explicit. Second, `res.(iceberg.DataFile)` on line 92 is unchecked — the two-value form with an `fmt.Errorf("codec: bridge returned unexpected type %T", res)` keeps the failure path informative if the bridge's return contract ever changes. wdyt? ########## data_file_codec.go: ########## @@ -0,0 +1,260 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iceberg + +import ( + "fmt" + "reflect" + "sync" + + "github.com/apache/iceberg-go/internal" + "github.com/apache/iceberg-go/internal/datafileavro" + "github.com/twmb/avro" +) + +func init() { + datafileavro.Unmarshal = func(data []byte, spec, schema any, version int) (any, error) { + return unmarshalAvroDataFileEntry(data, spec.(PartitionSpec), schema.(*Schema), version) Review Comment: Same defensive point on the writer side of the bridge: `spec.(PartitionSpec)` and `schema.(*Schema)` are unguarded. With `any`-typed parameters there's no compile-time enforcement; any future caller passing mismatched types turns into a panic with no context. Checked assertions + a wrapped error keeps the failure observable. ########## data_file_codec.go: ########## @@ -0,0 +1,260 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iceberg + +import ( + "fmt" + "reflect" + "sync" + + "github.com/apache/iceberg-go/internal" + "github.com/apache/iceberg-go/internal/datafileavro" + "github.com/twmb/avro" +) + +func init() { Review Comment: Nit: a once-set guard would catch a future split of init responsibility from silently overwriting the bridge. `if datafileavro.Unmarshal != nil { panic("iceberg: datafileavro.Unmarshal already set") }` is the standard pattern for these hooks — fails loudly at startup instead of letting the second writer win. ########## codec/file_scan_task_test.go: ########## @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package codec_test + +import ( + "strconv" + "testing" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/codec" + "github.com/apache/iceberg-go/table" + "github.com/stretchr/testify/require" +) + +func TestEncodeDecodeFileScanTaskRoundTrip(t *testing.T) { + for _, version := range []int{2, 3} { Review Comment: v1 isn't in the round-trip range here. The v1 encode path uses `fallbackManifestEntry` with a non-nullable `snapshot_id` written as 0 — the most divergent branch in the pipeline, currently no integration coverage at the `FileScanTask` envelope level. Adding `1` to the version slice with a positional-deletes-only fixture would pin it. ########## codec/file_scan_task.go: ########## @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package codec + +import ( + "fmt" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/table" + "github.com/twmb/avro" +) + +// EncodeFileScanTask encodes a FileScanTask for cross-process transport. +// Each carried DataFile is encoded with [EncodeDataFile] and wrapped in +// a small record that also carries the scan range and v3 row lineage. +// The (spec, schema, version) triple must match what [DecodeFileScanTask] +// is given on the receiver. +// +// All carried DataFiles (data, positional deletes, equality deletes, +// and deletion vectors) are encoded against the same (spec, schema) +// passed in. After partition evolution, delete files may have been +// written under a different partition spec than the data file; callers +// holding such files must encode them against the spec they were +// written with, typically by partitioning the FileScanTask by per-file +// specID and calling EncodeFileScanTask once per group. +func EncodeFileScanTask(task table.FileScanTask, spec iceberg.PartitionSpec, schema *iceberg.Schema, version int) ([]byte, error) { + if version < 1 || version > 3 { + return nil, fmt.Errorf("codec: EncodeFileScanTask: unsupported format version %d", version) + } + fileBytes, err := EncodeDataFile(task.File, spec, schema, version) + if err != nil { + return nil, fmt.Errorf("file: %w", err) + } + del, err := encodeDataFileSlice(task.DeleteFiles, spec, schema, version) + if err != nil { + return nil, fmt.Errorf("delete files: %w", err) + } + eq, err := encodeDataFileSlice(task.EqualityDeleteFiles, spec, schema, version) + if err != nil { + return nil, fmt.Errorf("equality delete files: %w", err) + } + dv, err := encodeDataFileSlice(task.DeletionVectorFiles, spec, schema, version) + if err != nil { + return nil, fmt.Errorf("deletion vector files: %w", err) + } + envelope := fileScanTaskEnvelope{ + File: fileBytes, + DeleteFiles: del, + EqualityDeleteFiles: eq, + DeletionVectorFiles: dv, + Start: task.Start, + Length: task.Length, + FirstRowID: task.FirstRowID, + DataSequenceNumber: task.DataSequenceNumber, + } + + return fileScanTaskSchema.Encode(&envelope) +} + +// DecodeFileScanTask reverses [EncodeFileScanTask]. The triple +// (spec, schema, version) must match the encoder. +func DecodeFileScanTask(data []byte, spec iceberg.PartitionSpec, schema *iceberg.Schema, version int) (table.FileScanTask, error) { + if version < 1 || version > 3 { + return table.FileScanTask{}, fmt.Errorf("codec: DecodeFileScanTask: unsupported format version %d", version) + } + var envelope fileScanTaskEnvelope + if _, err := fileScanTaskSchema.Decode(data, &envelope); err != nil { + return table.FileScanTask{}, fmt.Errorf("decode: %w", err) + } + file, err := DecodeDataFile(envelope.File, spec, schema, version) + if err != nil { + return table.FileScanTask{}, fmt.Errorf("file: %w", err) + } + del, err := decodeDataFileSlice(envelope.DeleteFiles, spec, schema, version) + if err != nil { + return table.FileScanTask{}, fmt.Errorf("delete files: %w", err) + } + eq, err := decodeDataFileSlice(envelope.EqualityDeleteFiles, spec, schema, version) + if err != nil { + return table.FileScanTask{}, fmt.Errorf("equality delete files: %w", err) + } + dv, err := decodeDataFileSlice(envelope.DeletionVectorFiles, spec, schema, version) + if err != nil { + return table.FileScanTask{}, fmt.Errorf("deletion vector files: %w", err) + } + + return table.FileScanTask{ + File: file, + DeleteFiles: del, + EqualityDeleteFiles: eq, + DeletionVectorFiles: dv, + Start: envelope.Start, + Length: envelope.Length, + FirstRowID: envelope.FirstRowID, + DataSequenceNumber: envelope.DataSequenceNumber, + }, nil +} + +// fileScanTaskShape is a compile-time drift guard for FileScanTask. +// Go only permits struct conversion between two types that have +// identical underlying field sequences (names, types, and order; tags +// are ignored), so the var _ below fails to build the moment +// table.FileScanTask gains, loses, renames, retypes, or reorders a +// field. That forces a deliberate decision about whether the change +// must be carried by [EncodeFileScanTask] / [DecodeFileScanTask]; when +// extending, update fileScanTaskEnvelope, the schema JSON below, the +// encode/decode bodies, and this shape together. +type fileScanTaskShape struct { + File iceberg.DataFile + DeleteFiles []iceberg.DataFile + EqualityDeleteFiles []iceberg.DataFile + DeletionVectorFiles []iceberg.DataFile + Start, Length int64 + FirstRowID *int64 + DataSequenceNumber *int64 +} + +var _ = table.FileScanTask(fileScanTaskShape{}) + +// fileScanTaskEnvelope is the avro on-wire shape. The DataFile payloads +// (File and the three delete-file lists) are themselves [EncodeDataFile] +// bytes; this struct only frames them along with the scan range and v3 +// lineage. +type fileScanTaskEnvelope struct { + File []byte `avro:"file"` + DeleteFiles [][]byte `avro:"delete_files"` + EqualityDeleteFiles [][]byte `avro:"equality_delete_files"` + DeletionVectorFiles [][]byte `avro:"deletion_vector_files"` + Start int64 `avro:"start"` + Length int64 `avro:"length"` + FirstRowID *int64 `avro:"first_row_id"` + DataSequenceNumber *int64 `avro:"data_sequence_number"` +} + +const fileScanTaskSchemaJSON = `{ + "type": "record", + "name": "file_scan_task", + "fields": [ + {"name": "file", "type": "bytes"}, + {"name": "delete_files", "type": {"type": "array", "items": "bytes"}}, + {"name": "equality_delete_files", "type": {"type": "array", "items": "bytes"}}, + {"name": "deletion_vector_files", "type": {"type": "array", "items": "bytes"}}, + {"name": "start", "type": "long"}, + {"name": "length", "type": "long"}, + {"name": "first_row_id", "type": ["null", "long"]}, + {"name": "data_sequence_number", "type": ["null", "long"]} + ] +}` + +var fileScanTaskSchema *avro.Schema + +func init() { + s, err := avro.Parse(fileScanTaskSchemaJSON) + if err != nil { + panic("codec: fileScanTaskSchema invalid: " + err.Error()) + } + fileScanTaskSchema = s +} + +func encodeDataFileSlice(files []iceberg.DataFile, spec iceberg.PartitionSpec, schema *iceberg.Schema, version int) ([][]byte, error) { Review Comment: The cross-spec point from round 1 didn't fully land. The doc on `EncodeFileScanTask` tells callers to group delete files by per-file specID before calling — but `table.FileScanTask` doesn't expose the per-file specs in a form that supports doing that grouping at the call site without reaching back into table metadata that isn't part of the codec API. A one-liner in `encodeDataFileSlice` (`if df.SpecID() != spec.ID() { return nil, fmt.Errorf(...) }`) converts the silent partition-data corruption path into an explicit error and matches the contract the doc already promises. wdyt? ########## codec/data_file.go: ########## @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package codec encodes and decodes iceberg-go values for cross-process +// transport. The bytes it produces are the same Avro bytes a manifest +// carries for the corresponding value, so callers transport iceberg +// internals without inventing a parallel wire schema. +// +// EncodeDataFile / DecodeDataFile move a single [iceberg.DataFile] +// using the manifest-entry encoding for a given partition spec, table +// schema, and format version. +// +// EncodeFileScanTask / DecodeFileScanTask layer on top: each embedded +// DataFile is encoded with [EncodeDataFile], then wrapped alongside the +// scan range and v3 row lineage in a small Avro envelope. +// +// The receiver supplies (spec, schema, version) out of band. Both sides +// in a distributed-processing design already hold table metadata, and +// the per-(partition-type, version) avro schema is cached. +package codec + +import ( + "fmt" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/internal/datafileavro" +) + +// EncodeDataFile encodes a single DataFile for cross-process transport +// using the manifest-entry Avro encoding for the given partition spec, +// table schema and format version (1, 2, or 3). The wire format is the +// same one a manifest carries for this data file. The receiver MUST +// call [DecodeDataFile] with the matching (spec, schema, version) +// triple. +// +// df must implement [iceberg.AvroEntryMarshaler]. The iceberg +// package's built-in DataFile implementation satisfies it; external +// implementations of [iceberg.DataFile] can opt in by implementing +// the marshaler interface themselves. +// +// EncodeDataFile is non-mutating and safe to call concurrently with +// any other reader or encoder of the same DataFile, provided the +// underlying implementation honors that contract. +// +// v1 note: v1 manifest entries carry a non-nullable snapshot_id which +// is written as 0 by the iceberg implementation. v1 bytes are not +// usable as a standalone manifest entry — they only round-trip via +// [DecodeDataFile]. +// +// distinct_counts (field 111) is deprecated in the spec for all +// versions. Already-set values round-trip on v1 and v2 as a +// read-compatibility artifact; v3 omits the field entirely +// (apache/iceberg#12182). New DataFiles should not set distinct +// counts. +func EncodeDataFile(df iceberg.DataFile, spec iceberg.PartitionSpec, schema *iceberg.Schema, version int) ([]byte, error) { Review Comment: Two parallel exports for the same operation: `iceberg.EncodeDataFile` in the root package, `codec.EncodeDataFile` here. The wrapper delegates to `MarshalAvroEntry` and surfaces errors like `"iceberg: MarshalAvroEntry: unsupported format version 0"` — exposing the internal symbol to `codec` callers, while `EncodeFileScanTask` in the same package uses `"codec: EncodeFileScanTask: ..."`. Picking one canonical entry point and aligning the error prefix would tidy this. ########## data_file_codec.go: ########## @@ -0,0 +1,260 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iceberg + +import ( + "fmt" + "reflect" + "sync" + + "github.com/apache/iceberg-go/internal" + "github.com/apache/iceberg-go/internal/datafileavro" + "github.com/twmb/avro" +) + +func init() { + datafileavro.Unmarshal = func(data []byte, spec, schema any, version int) (any, error) { + return unmarshalAvroDataFileEntry(data, spec.(PartitionSpec), schema.(*Schema), version) + } +} + +// AvroEntryMarshaler is implemented by DataFile values that can be +// encoded using the manifest-entry Avro encoding. The iceberg +// package's built-in DataFile implementation satisfies it; external +// implementations can also satisfy it to participate in the +// [github.com/apache/iceberg-go/codec] DataFile codec. +// +// The encoded bytes are the same bytes a manifest carries for this +// data file. Implementations must produce output that the iceberg +// package's manifest-entry Avro decoder accepts. +type AvroEntryMarshaler interface { + MarshalAvroEntry(spec PartitionSpec, schema *Schema, version int) ([]byte, error) +} + +// MarshalAvroEntry encodes this DataFile as Avro bytes using the +// manifest-entry encoding for the given partition spec, table schema +// and format version (1, 2, or 3). The wire format is the same one a +// manifest carries for this data file, so adding a field to the +// underlying struct (and its avro tags) automatically extends what +// MarshalAvroEntry transports — there is no separate wire-mirror +// struct to keep in sync. +// +// MarshalAvroEntry is the iceberg-package side of the +// [github.com/apache/iceberg-go/codec] DataFile codec; callers +// performing cross-process transport should prefer that package's +// high-level API. +// +// MarshalAvroEntry is safe to call concurrently with any other +// reader or encoder of the same DataFile: a fresh *dataFile is +// cloned (avro-tagged fields only) and the avro encoder reads, but +// does not mutate, the cloned values. Pointer-typed avro fields like +// ColSizes share their backing storage with the source; the +// thread-safety guarantee relies on the avro encoder being +// non-mutating. +// +// v1 note: the v1 manifest-entry schema has a non-nullable snapshot_id +// field. MarshalAvroEntry writes 0 there, so v1 bytes are not usable +// as a standalone manifest entry — they only round-trip via the +// matching decoder. +// +// distinct_counts (field 111) is deprecated in the spec for all +// versions. MarshalAvroEntry preserves any value already on the +// source for v1 and v2 as a read-compatibility artifact; v3 omits +// the field entirely (apache/iceberg#12182). New DataFiles should +// not set distinct counts. +func (d *dataFile) MarshalAvroEntry(spec PartitionSpec, schema *Schema, version int) ([]byte, error) { + if version < 1 || version > 3 { + return nil, fmt.Errorf("iceberg: MarshalAvroEntry: unsupported format version %d", version) + } + s, maps, err := manifestEntrySchemaFor(spec, schema, version) + if err != nil { + return nil, err + } + clone := cloneDataFileAvroFields(d) + clone.PartitionData = avroEncodePartitionData(d.Partition(), maps.nameToID, maps.idToType) + + return s.Encode(newEncodeEntry(version, clone)) +} + +// unmarshalAvroDataFileEntry decodes Avro bytes produced by +// [(*dataFile).MarshalAvroEntry] back into a DataFile. The +// (spec, schema, version) triple must match the encoder; passing a +// different spec or version yields a decode error or silently +// mis-typed partition values. +// +// The returned DataFile carries the partition spec id and the field-id +// lookup tables, so Partition() and the stats accessors return id-keyed +// maps as if the file had been read from a manifest. +// +// It is reachable from the [github.com/apache/iceberg-go/codec] +// package through the [datafileavro] bridge. +func unmarshalAvroDataFileEntry(data []byte, spec PartitionSpec, schema *Schema, version int) (DataFile, error) { + if version < 1 || version > 3 { + return nil, fmt.Errorf("iceberg: unmarshalAvroDataFileEntry: unsupported format version %d", version) + } + s, maps, err := manifestEntrySchemaFor(spec, schema, version) + if err != nil { + return nil, err + } + entry, df := newDecodeEntry(version) + if _, err := s.Decode(data, entry); err != nil { + return nil, fmt.Errorf("iceberg: unmarshalAvroDataFileEntry: %w", err) + } + df.specID = int32(spec.ID()) + df.fieldNameToID = maps.nameToID + df.fieldIDToLogicalType = maps.idToType + df.fieldIDToFixedSize = maps.idToFixedSize + + return df, nil +} + +// newEncodeEntry returns the right manifest-entry shape for the schema +// version: v1's manifest_entry has a non-nullable snapshot_id and uses +// [fallbackManifestEntry], v2/v3 use [manifestEntry] with nullable +// pointers. +func newEncodeEntry(version int, df *dataFile) any { + if version == 1 { + return &fallbackManifestEntry{ + manifestEntry: manifestEntry{EntryStatus: EntryStatusADDED, Data: df}, + } + } + + return &manifestEntry{EntryStatus: EntryStatusADDED, Data: df} +} + +// newDecodeEntry mirrors [newEncodeEntry] for the read side: it returns +// the pointer to pass to avro.Schema.Decode along with the pre-allocated +// *dataFile that will be populated. +func newDecodeEntry(version int) (any, *dataFile) { + df := &dataFile{} + if version == 1 { + return &fallbackManifestEntry{manifestEntry: manifestEntry{Data: df}}, df + } + + return &manifestEntry{Data: df}, df +} + +// cloneDataFileAvroFields returns a fresh *dataFile populated with src's +// avro-tagged fields. Internal state (sync.Once, lazy-init caches, +// specID, the field-id lookup maps) is intentionally left at zero +// values because the avro encoder reads only the avro-tagged fields. +// +// Using reflection over the tag set means a new avro-tagged field +// upstream is auto-copied without an update here — the dataFile struct +// remains the single source of truth for the wire shape. It also +// sidesteps the go-vet copies-lock warning that would fire on a +// struct-literal copy of *dataFile (it embeds sync.Once). +// +// Note: this is a shallow copy. Pointer-typed avro fields (ColSizes, +// LowerBounds, etc.) share their backing storage with the source. +// The no-mutation guarantee of MarshalAvroEntry depends on the avro +// encoder being read-only on the values it walks; TestMarshalAvroEntry +// DoesNotMutate asserts this end-to-end across every avro-tagged +// field, so a future regression in the encoder surfaces in tests. +func cloneDataFileAvroFields(src *dataFile) *dataFile { + out := &dataFile{} + srcVal := reflect.ValueOf(src).Elem() + outVal := reflect.ValueOf(out).Elem() + t := srcVal.Type() + for i := 0; i < t.NumField(); i++ { + if _, hasAvroTag := t.Field(i).Tag.Lookup("avro"); hasAvroTag { + outVal.Field(i).Set(srcVal.Field(i)) + } + } + + return out +} + +// avroEncodePartitionData converts an id-keyed partition tuple (carrying +// iceberg-typed values like Date or Decimal) into the name-keyed +// avro-friendly map the manifest-entry schema expects. Idempotent: +// values already in primitive form pass through unchanged. +func avroEncodePartitionData(idKeyed map[int]any, nameToID map[string]int, logicalTypes map[int]string) map[string]any { + converted := avroPartitionData(idKeyed, logicalTypes) + out := make(map[string]any, len(converted)) + for name, id := range nameToID { + if v, ok := converted[id]; ok { + out[name] = v + } + } + + return out +} + +type dataFileFieldMaps struct { + nameToID map[string]int + idToType map[int]string + idToFixedSize map[int]int +} + +// dataFileSchemaCacheKey identifies a cached avro schema by the +// structural fingerprint of its partition type and the format version. +// Using the fingerprint (rather than spec.ID()) avoids cross-table +// collisions: two tables that both rely on InitialPartitionSpecID = 0 +// but expose different partition column types receive different +// cached schemas. +type dataFileSchemaCacheKey struct { + partTypeFingerprint string + version int +} + +type dataFileSchemaEntry struct { + schema *avro.Schema + maps dataFileFieldMaps +} + +var dataFileSchemaCache sync.Map + +// manifestEntrySchemaFor returns the cached avro schema and partition +// field-id lookups for the given partition type and format version. +// The cache key uses [StructType.String] as a structural fingerprint +// of the partition type, so two specs that produce different partition +// column types cache under different keys even when they share an id. +func manifestEntrySchemaFor(spec PartitionSpec, schema *Schema, version int) (*avro.Schema, dataFileFieldMaps, error) { + partType := spec.PartitionType(schema) + key := dataFileSchemaCacheKey{partTypeFingerprint: partType.String(), version: version} Review Comment: Nit on the cache key: `StructType.String()` appends field doc strings into the fingerprint. Two structurally identical partition specs with different documentation produce equivalent-but-distinct Avro schema entries — unbounded fragmentation in metadata-rich environments. The Avro shape from `partitionTypeToAvroSchema` itself would be a tighter key, but happy to leave as a follow-up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
