zeroshade commented on code in PR #676: URL: https://github.com/apache/iceberg-go/pull/676#discussion_r2688262550
########## puffin/puffin.go: ########## @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package puffin + +var magic = [4]byte{'P', 'F', 'A', '1'} + +const ( + //[Magic] [FooterPayload] [FooterPayloadSize] [Flags] [Magic] + // MagicSize is the number of bytes in the magic marker. + MagicSize = 4 + // footerTrailerSize accounts for footer length (4)+ flags (4) + trailing magic (4). + footerTrailerSize = 12 + // FooterFlagCompressed indicates a compressed footer; unsupported in this implementation. + FooterFlagCompressed = 1 // bit 0 + + // DefaultMaxBlobSize is the maximum blob size allowed when reading (256 MB). + // This prevents OOM attacks from malicious files with huge blob lengths. + // Override with WithMaxBlobSize when creating a reader. + DefaultMaxBlobSize = 256 << 20 + // CreatedBy is a human-readable identification of the application writing the file, along with its version. + // Example: "Trino version 381". + CreatedBy = "created-by" + // ApacheDataSketchesThetaV1 is a serialized compact Theta sketch from Apache DataSketches. + ApacheDataSketchesThetaV1 = "apache-datasketches-theta-v1" + + // DeletionVectorV1 is a serialized deletion vector according to the Iceberg spec. + DeletionVectorV1 = "deletion-vector-v1" +) + +type BlobMetadata struct { + Type string `json:"type"` + SnapshotID int64 `json:"snapshot-id"` + SequenceNumber int64 `json:"sequence-number"` + Fields []int32 `json:"fields,omitempty"` Review Comment: this is required, remove the `omitempty` ########## puffin/puffin_writer.go: ########## @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package puffin + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "math" +) + +// writeAll writes all bytes to w or returns an error. +// Handles the io.Writer contract where Write can return n < len(data) without error. +func writeAll(w io.Writer, data []byte) error { + n, err := w.Write(data) + if err != nil { + return err + } + if n != len(data) { + return fmt.Errorf("short write: wrote %d of %d bytes", n, len(data)) + } + + return nil +} + +// PuffinWriter writes blobs and metadata to a Puffin file. +// +// The Puffin format stores arbitrary blobs (e.g., statistics, deletion vectors) +// along with JSON metadata in a footer. The writer handles the binary layout: +// header magic, blob data, and footer structure. +// +// Usage: +// +// w, err := puffin.NewWriter(file) +// if err != nil { +// return err +// } +// _, err = w.AddBlob(puffin.BlobMetadataInput{ +// Type: puffin.ApacheDataSketchesThetaV1, +// SnapshotID: 123, +// Fields: []int32{1}, +// }, sketchBytes) +// if err != nil { +// return err +// } +// return w.Finish() +type PuffinWriter struct { + w io.Writer + offset int64 + blobs []BlobMetadata + props map[string]string + done bool + createdBy string +} + +// BlobMetadataInput contains fields the caller provides when adding a blob. +// Offset, Length, and CompressionCodec are set by the writer. +type BlobMetadataInput struct { + Type string + SnapshotID int64 + SequenceNumber int64 + Fields []int32 + Properties map[string]string +} Review Comment: rather than just using a string for the "type" can we have a Blob interface or at least specific type like `BlobType` with particular consts? We don't want to allow arbitrary strings here. We only want to allow valid blobs. ########## puffin/puffin_writer.go: ########## @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package puffin + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "math" +) + +// writeAll writes all bytes to w or returns an error. +// Handles the io.Writer contract where Write can return n < len(data) without error. +func writeAll(w io.Writer, data []byte) error { + n, err := w.Write(data) + if err != nil { + return err + } + if n != len(data) { + return fmt.Errorf("short write: wrote %d of %d bytes", n, len(data)) + } + + return nil +} + +// PuffinWriter writes blobs and metadata to a Puffin file. +// +// The Puffin format stores arbitrary blobs (e.g., statistics, deletion vectors) +// along with JSON metadata in a footer. The writer handles the binary layout: +// header magic, blob data, and footer structure. +// +// Usage: +// +// w, err := puffin.NewWriter(file) +// if err != nil { +// return err +// } +// _, err = w.AddBlob(puffin.BlobMetadataInput{ +// Type: puffin.ApacheDataSketchesThetaV1, +// SnapshotID: 123, +// Fields: []int32{1}, +// }, sketchBytes) +// if err != nil { +// return err +// } +// return w.Finish() +type PuffinWriter struct { + w io.Writer + offset int64 + blobs []BlobMetadata + props map[string]string + done bool + createdBy string +} + +// BlobMetadataInput contains fields the caller provides when adding a blob. +// Offset, Length, and CompressionCodec are set by the writer. +type BlobMetadataInput struct { + Type string + SnapshotID int64 + SequenceNumber int64 + Fields []int32 + Properties map[string]string +} + +// NewWriter creates a new PuffinWriter and writes the file header magic. +// The caller is responsible for closing the underlying writer after Finish returns. +func NewWriter(w io.Writer) (*PuffinWriter, error) { + if w == nil { + return nil, errors.New("puffin: writer is nil") + } + + // Write header magic bytes + if err := writeAll(w, magic[:]); err != nil { + return nil, fmt.Errorf("puffin: write header magic: %w", err) + } + + return &PuffinWriter{ + w: w, + offset: MagicSize, + props: make(map[string]string), + createdBy: "iceberg-go", + }, nil +} + +// SetProperties merges the provided properties into the file-level properties +// written to the footer. Can be called multiple times before Finish. +func (w *PuffinWriter) SetProperties(props map[string]string) error { Review Comment: should we have a way to remove properties also? Or a way to clear the current properties? ########## puffin/puffin_reader.go: ########## @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package puffin + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "sort" +) + +// PuffinReader +// +// Usage: +// +// r, err := puffin.NewPuffinReader(file, fileSize) +// if err != nil { +// return err +// } +// footer, err := r.ReadFooter() +// if err != nil { +// return err +// } +// for i := range footer.Blobs { +// blob, err := r.ReadBlob(i) +// // process blob.Data +// } +type PuffinReader struct { + r io.ReaderAt + size int64 + footer *Footer Review Comment: why a pointer for the footer? ########## puffin/puffin.go: ########## @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package puffin Review Comment: Can you add a package docstring here to provide package level documentation? Make sure to link to the actual puffin spec! It should be like ```go // under the license // Package puffin is...... // .... // .... package puffin ``` Thanks! ########## puffin/puffin_writer.go: ########## @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package puffin + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "math" +) + +// writeAll writes all bytes to w or returns an error. +// Handles the io.Writer contract where Write can return n < len(data) without error. +func writeAll(w io.Writer, data []byte) error { + n, err := w.Write(data) + if err != nil { + return err + } + if n != len(data) { + return fmt.Errorf("short write: wrote %d of %d bytes", n, len(data)) + } + + return nil +} + +// PuffinWriter writes blobs and metadata to a Puffin file. +// +// The Puffin format stores arbitrary blobs (e.g., statistics, deletion vectors) +// along with JSON metadata in a footer. The writer handles the binary layout: +// header magic, blob data, and footer structure. +// +// Usage: +// +// w, err := puffin.NewWriter(file) +// if err != nil { +// return err +// } +// _, err = w.AddBlob(puffin.BlobMetadataInput{ +// Type: puffin.ApacheDataSketchesThetaV1, +// SnapshotID: 123, +// Fields: []int32{1}, +// }, sketchBytes) +// if err != nil { +// return err +// } +// return w.Finish() +type PuffinWriter struct { + w io.Writer + offset int64 + blobs []BlobMetadata + props map[string]string + done bool + createdBy string +} + +// BlobMetadataInput contains fields the caller provides when adding a blob. +// Offset, Length, and CompressionCodec are set by the writer. +type BlobMetadataInput struct { + Type string + SnapshotID int64 + SequenceNumber int64 + Fields []int32 + Properties map[string]string +} + +// NewWriter creates a new PuffinWriter and writes the file header magic. +// The caller is responsible for closing the underlying writer after Finish returns. +func NewWriter(w io.Writer) (*PuffinWriter, error) { + if w == nil { + return nil, errors.New("puffin: writer is nil") + } + + // Write header magic bytes + if err := writeAll(w, magic[:]); err != nil { + return nil, fmt.Errorf("puffin: write header magic: %w", err) + } + + return &PuffinWriter{ + w: w, + offset: MagicSize, + props: make(map[string]string), + createdBy: "iceberg-go", + }, nil +} + +// SetProperties merges the provided properties into the file-level properties +// written to the footer. Can be called multiple times before Finish. +func (w *PuffinWriter) SetProperties(props map[string]string) error { + if w.done { + return errors.New("puffin: cannot set properties: writer already finalized") + } + for k, v := range props { + w.props[k] = v + } + + return nil +} + +// SetCreatedBy overrides the default "created-by" property written to the footer. +// The default value is "iceberg-go". Example: "MyApp version 1.2.3". Review Comment: we should probably include the version for the default as `iceberg-go v0.4.0` etc... ########## puffin/puffin_writer.go: ########## @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package puffin + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "math" +) + +// writeAll writes all bytes to w or returns an error. +// Handles the io.Writer contract where Write can return n < len(data) without error. +func writeAll(w io.Writer, data []byte) error { + n, err := w.Write(data) + if err != nil { + return err + } + if n != len(data) { + return fmt.Errorf("short write: wrote %d of %d bytes", n, len(data)) + } + + return nil +} + +// PuffinWriter writes blobs and metadata to a Puffin file. +// +// The Puffin format stores arbitrary blobs (e.g., statistics, deletion vectors) +// along with JSON metadata in a footer. The writer handles the binary layout: +// header magic, blob data, and footer structure. +// +// Usage: +// +// w, err := puffin.NewWriter(file) +// if err != nil { +// return err +// } +// _, err = w.AddBlob(puffin.BlobMetadataInput{ +// Type: puffin.ApacheDataSketchesThetaV1, +// SnapshotID: 123, +// Fields: []int32{1}, +// }, sketchBytes) +// if err != nil { +// return err +// } +// return w.Finish() +type PuffinWriter struct { + w io.Writer + offset int64 + blobs []BlobMetadata + props map[string]string + done bool + createdBy string +} + +// BlobMetadataInput contains fields the caller provides when adding a blob. +// Offset, Length, and CompressionCodec are set by the writer. +type BlobMetadataInput struct { + Type string + SnapshotID int64 + SequenceNumber int64 + Fields []int32 + Properties map[string]string +} + +// NewWriter creates a new PuffinWriter and writes the file header magic. +// The caller is responsible for closing the underlying writer after Finish returns. +func NewWriter(w io.Writer) (*PuffinWriter, error) { + if w == nil { + return nil, errors.New("puffin: writer is nil") + } + + // Write header magic bytes + if err := writeAll(w, magic[:]); err != nil { + return nil, fmt.Errorf("puffin: write header magic: %w", err) + } + + return &PuffinWriter{ + w: w, + offset: MagicSize, + props: make(map[string]string), + createdBy: "iceberg-go", + }, nil +} + +// SetProperties merges the provided properties into the file-level properties +// written to the footer. Can be called multiple times before Finish. +func (w *PuffinWriter) SetProperties(props map[string]string) error { + if w.done { + return errors.New("puffin: cannot set properties: writer already finalized") + } + for k, v := range props { + w.props[k] = v + } + + return nil +} + +// SetCreatedBy overrides the default "created-by" property written to the footer. +// The default value is "iceberg-go". Example: "MyApp version 1.2.3". +func (w *PuffinWriter) SetCreatedBy(createdBy string) error { + if w.done { + return errors.New("puffin: cannot set created-by: writer already finalized") + } + if createdBy == "" { + return errors.New("puffin: cannot set created-by: value cannot be empty") + } + w.createdBy = createdBy + + return nil +} + +// AddBlob writes blob data and records its metadata for the footer. +// Returns the complete BlobMetadata including the computed Offset and Length. +// The input.Type is required; use constants like ApacheDataSketchesThetaV1. +func (w *PuffinWriter) AddBlob(input BlobMetadataInput, data []byte) (BlobMetadata, error) { + if w.done { + return BlobMetadata{}, errors.New("puffin: cannot add blob: writer already finalized") + } + if input.Type == "" { + return BlobMetadata{}, errors.New("puffin: cannot add blob: type is required") + } + + meta := BlobMetadata{ + Type: input.Type, + SnapshotID: input.SnapshotID, + SequenceNumber: input.SequenceNumber, + Fields: input.Fields, Review Comment: we should validate the other required fields too right? Not just `Type`? ########## puffin/puffin_reader.go: ########## @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package puffin + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "sort" +) + +// PuffinReader +// +// Usage: +// +// r, err := puffin.NewPuffinReader(file, fileSize) +// if err != nil { +// return err +// } +// footer, err := r.ReadFooter() +// if err != nil { +// return err +// } +// for i := range footer.Blobs { +// blob, err := r.ReadBlob(i) +// // process blob.Data +// } +type PuffinReader struct { + r io.ReaderAt + size int64 + footer *Footer + footerStart int64 // cached after ReadFooter + maxBlobSize int64 +} + +// BlobData pairs a blob's metadata with its content. +type BlobData struct { + Metadata BlobMetadata + Data []byte +} + +// ReaderOption configures a PuffinReader. +type ReaderOption func(*PuffinReader) + +// WithMaxBlobSize sets the maximum blob size allowed when reading. +// This prevents OOM attacks from malicious files with huge blob lengths. +// Default is DefaultMaxBlobSize (256 MB). +func WithMaxBlobSize(size int64) ReaderOption { + return func(r *PuffinReader) { + r.maxBlobSize = size + } +} + +// NewPuffinReader creates a new Puffin reader. +// It validates both the header and trailing magic bytes upfront. +// The caller is responsible for closing the underlying io.ReaderAt. +func NewPuffinReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*PuffinReader, error) { + if r == nil { + return nil, errors.New("puffin: reader is nil") + } + + // Minimum size: header magic + footer magic + footer trailer + // [Magic] + zero for blob + [Magic] + [FooterPayloadSize (assuming ~0)] + [Flags] + [Magic] + minSize := int64(MagicSize + MagicSize + footerTrailerSize) + if size < minSize { + return nil, fmt.Errorf("puffin: file too small (%d bytes, minimum %d)", size, minSize) + } + + // Validate header magic + var headerMagic [MagicSize]byte + if _, err := r.ReadAt(headerMagic[:], 0); err != nil { + return nil, fmt.Errorf("puffin: read header magic: %w", err) + } + if headerMagic != magic { + return nil, errors.New("puffin: invalid header magic") + } + + // Validate trailing magic (fail fast on corrupt/truncated files) + var trailingMagic [MagicSize]byte + if _, err := r.ReadAt(trailingMagic[:], size-MagicSize); err != nil { + return nil, fmt.Errorf("puffin: read trailing magic: %w", err) + } + if trailingMagic != magic { + return nil, errors.New("puffin: invalid trailing magic") + } + + pr := &PuffinReader{ + r: r, + size: size, + maxBlobSize: DefaultMaxBlobSize, + } + + for _, opt := range opts { + opt(pr) + } + + return pr, nil +} + +// ReadFooter reads and parses the footer from the Puffin file. +// The footer is cached after the first successful read. +func (r *PuffinReader) ReadFooter() (*Footer, error) { + if r.footer != nil { + return r.footer, nil + } + + // Read trailer (last 12 bytes): PayloadSize(4) + Flags(4) + Magic(4) + var trailer [footerTrailerSize]byte + if _, err := r.r.ReadAt(trailer[:], r.size-footerTrailerSize); err != nil { + return nil, fmt.Errorf("puffin: read footer trailer: %w", err) + } + + // Validate trailing magic (already checked in constructor, but be defensive) + if trailer[8] != magic[0] || trailer[9] != magic[1] || + trailer[10] != magic[2] || trailer[11] != magic[3] { + return nil, errors.New("puffin: invalid trailing magic in footer") + } Review Comment: `bytes.Equal(trailer[8:12], magic[:])` ? ########## puffin/puffin_reader.go: ########## @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package puffin + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "sort" +) + +// PuffinReader +// +// Usage: +// +// r, err := puffin.NewPuffinReader(file, fileSize) +// if err != nil { +// return err +// } +// footer, err := r.ReadFooter() +// if err != nil { +// return err +// } +// for i := range footer.Blobs { +// blob, err := r.ReadBlob(i) +// // process blob.Data +// } +type PuffinReader struct { + r io.ReaderAt + size int64 + footer *Footer + footerStart int64 // cached after ReadFooter + maxBlobSize int64 +} + +// BlobData pairs a blob's metadata with its content. +type BlobData struct { + Metadata BlobMetadata + Data []byte +} + +// ReaderOption configures a PuffinReader. +type ReaderOption func(*PuffinReader) + +// WithMaxBlobSize sets the maximum blob size allowed when reading. +// This prevents OOM attacks from malicious files with huge blob lengths. +// Default is DefaultMaxBlobSize (256 MB). +func WithMaxBlobSize(size int64) ReaderOption { + return func(r *PuffinReader) { + r.maxBlobSize = size + } +} + +// NewPuffinReader creates a new Puffin reader. +// It validates both the header and trailing magic bytes upfront. +// The caller is responsible for closing the underlying io.ReaderAt. +func NewPuffinReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*PuffinReader, error) { Review Comment: since this is in a `puffin` package, just use `NewReader` and `NewWriter` for the functions, avoid the stuttering of `puffin.NewPuffinReader` ########## puffin/puffin_reader.go: ########## @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package puffin + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "sort" +) + +// PuffinReader +// +// Usage: +// +// r, err := puffin.NewPuffinReader(file, fileSize) +// if err != nil { +// return err +// } +// footer, err := r.ReadFooter() +// if err != nil { +// return err +// } +// for i := range footer.Blobs { +// blob, err := r.ReadBlob(i) +// // process blob.Data +// } +type PuffinReader struct { + r io.ReaderAt + size int64 + footer *Footer + footerStart int64 // cached after ReadFooter + maxBlobSize int64 +} + +// BlobData pairs a blob's metadata with its content. +type BlobData struct { + Metadata BlobMetadata + Data []byte +} + +// ReaderOption configures a PuffinReader. +type ReaderOption func(*PuffinReader) + +// WithMaxBlobSize sets the maximum blob size allowed when reading. +// This prevents OOM attacks from malicious files with huge blob lengths. +// Default is DefaultMaxBlobSize (256 MB). +func WithMaxBlobSize(size int64) ReaderOption { + return func(r *PuffinReader) { + r.maxBlobSize = size + } +} + +// NewPuffinReader creates a new Puffin reader. +// It validates both the header and trailing magic bytes upfront. +// The caller is responsible for closing the underlying io.ReaderAt. +func NewPuffinReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*PuffinReader, error) { Review Comment: is `size` supposed to be the full file size? Could we use https://github.com/apache/arrow-go/blob/v18.5.0/parquet/types.go#L55 or something else to ensure we can get the filesize from the reader? At a minimum, we should be documenting what `size` is supposed to be. ########## puffin/puffin_reader.go: ########## @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package puffin + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "sort" +) + +// PuffinReader +// +// Usage: +// +// r, err := puffin.NewPuffinReader(file, fileSize) +// if err != nil { +// return err +// } +// footer, err := r.ReadFooter() +// if err != nil { +// return err +// } +// for i := range footer.Blobs { +// blob, err := r.ReadBlob(i) +// // process blob.Data +// } +type PuffinReader struct { + r io.ReaderAt + size int64 + footer *Footer + footerStart int64 // cached after ReadFooter + maxBlobSize int64 +} + +// BlobData pairs a blob's metadata with its content. +type BlobData struct { + Metadata BlobMetadata + Data []byte +} + +// ReaderOption configures a PuffinReader. +type ReaderOption func(*PuffinReader) + +// WithMaxBlobSize sets the maximum blob size allowed when reading. +// This prevents OOM attacks from malicious files with huge blob lengths. +// Default is DefaultMaxBlobSize (256 MB). +func WithMaxBlobSize(size int64) ReaderOption { + return func(r *PuffinReader) { + r.maxBlobSize = size + } +} + +// NewPuffinReader creates a new Puffin reader. +// It validates both the header and trailing magic bytes upfront. +// The caller is responsible for closing the underlying io.ReaderAt. +func NewPuffinReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*PuffinReader, error) { + if r == nil { + return nil, errors.New("puffin: reader is nil") + } + + // Minimum size: header magic + footer magic + footer trailer + // [Magic] + zero for blob + [Magic] + [FooterPayloadSize (assuming ~0)] + [Flags] + [Magic] + minSize := int64(MagicSize + MagicSize + footerTrailerSize) + if size < minSize { + return nil, fmt.Errorf("puffin: file too small (%d bytes, minimum %d)", size, minSize) + } + + // Validate header magic + var headerMagic [MagicSize]byte + if _, err := r.ReadAt(headerMagic[:], 0); err != nil { + return nil, fmt.Errorf("puffin: read header magic: %w", err) + } + if headerMagic != magic { + return nil, errors.New("puffin: invalid header magic") + } + + // Validate trailing magic (fail fast on corrupt/truncated files) + var trailingMagic [MagicSize]byte + if _, err := r.ReadAt(trailingMagic[:], size-MagicSize); err != nil { + return nil, fmt.Errorf("puffin: read trailing magic: %w", err) + } + if trailingMagic != magic { + return nil, errors.New("puffin: invalid trailing magic") + } + + pr := &PuffinReader{ + r: r, + size: size, + maxBlobSize: DefaultMaxBlobSize, + } + + for _, opt := range opts { + opt(pr) + } + + return pr, nil +} + +// ReadFooter reads and parses the footer from the Puffin file. +// The footer is cached after the first successful read. +func (r *PuffinReader) ReadFooter() (*Footer, error) { + if r.footer != nil { + return r.footer, nil + } + + // Read trailer (last 12 bytes): PayloadSize(4) + Flags(4) + Magic(4) + var trailer [footerTrailerSize]byte + if _, err := r.r.ReadAt(trailer[:], r.size-footerTrailerSize); err != nil { + return nil, fmt.Errorf("puffin: read footer trailer: %w", err) + } Review Comment: something we've learned from Parquet: eagerly read *more* than we will need for the footer in the hopes that we get the whole footer in one shot, *then* process the footer size and so on. In the case of the file being on cloud object storage, we want to reduce the number of round trips we need to do in order to read the file. ########## puffin/puffin_reader.go: ########## @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package puffin + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "sort" +) + +// PuffinReader +// +// Usage: +// +// r, err := puffin.NewPuffinReader(file, fileSize) +// if err != nil { +// return err +// } +// footer, err := r.ReadFooter() +// if err != nil { +// return err +// } +// for i := range footer.Blobs { +// blob, err := r.ReadBlob(i) +// // process blob.Data +// } +type PuffinReader struct { Review Comment: just name this `Reader` so that we aren't stuttering: we don't want `puffin.PuffinReader` we want `puffin.Reader`, same goes for the writer. ########## puffin/puffin_reader.go: ########## @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package puffin + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "sort" +) + +// PuffinReader +// +// Usage: +// +// r, err := puffin.NewPuffinReader(file, fileSize) +// if err != nil { +// return err +// } +// footer, err := r.ReadFooter() +// if err != nil { +// return err +// } +// for i := range footer.Blobs { +// blob, err := r.ReadBlob(i) +// // process blob.Data +// } +type PuffinReader struct { + r io.ReaderAt + size int64 + footer *Footer + footerStart int64 // cached after ReadFooter + maxBlobSize int64 +} + +// BlobData pairs a blob's metadata with its content. +type BlobData struct { + Metadata BlobMetadata + Data []byte +} + +// ReaderOption configures a PuffinReader. +type ReaderOption func(*PuffinReader) + +// WithMaxBlobSize sets the maximum blob size allowed when reading. +// This prevents OOM attacks from malicious files with huge blob lengths. +// Default is DefaultMaxBlobSize (256 MB). +func WithMaxBlobSize(size int64) ReaderOption { + return func(r *PuffinReader) { + r.maxBlobSize = size + } +} + +// NewPuffinReader creates a new Puffin reader. +// It validates both the header and trailing magic bytes upfront. +// The caller is responsible for closing the underlying io.ReaderAt. +func NewPuffinReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*PuffinReader, error) { + if r == nil { + return nil, errors.New("puffin: reader is nil") + } + + // Minimum size: header magic + footer magic + footer trailer + // [Magic] + zero for blob + [Magic] + [FooterPayloadSize (assuming ~0)] + [Flags] + [Magic] + minSize := int64(MagicSize + MagicSize + footerTrailerSize) + if size < minSize { + return nil, fmt.Errorf("puffin: file too small (%d bytes, minimum %d)", size, minSize) + } + + // Validate header magic + var headerMagic [MagicSize]byte + if _, err := r.ReadAt(headerMagic[:], 0); err != nil { + return nil, fmt.Errorf("puffin: read header magic: %w", err) + } + if headerMagic != magic { + return nil, errors.New("puffin: invalid header magic") + } + + // Validate trailing magic (fail fast on corrupt/truncated files) + var trailingMagic [MagicSize]byte + if _, err := r.ReadAt(trailingMagic[:], size-MagicSize); err != nil { + return nil, fmt.Errorf("puffin: read trailing magic: %w", err) + } + if trailingMagic != magic { + return nil, errors.New("puffin: invalid trailing magic") + } + + pr := &PuffinReader{ + r: r, + size: size, + maxBlobSize: DefaultMaxBlobSize, + } + + for _, opt := range opts { + opt(pr) + } + + return pr, nil Review Comment: is there any reason not to read the footer as part of `NewReader` rather than having a separate `ReadFooter` method? ########## puffin/puffin_reader.go: ########## @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package puffin + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "sort" +) + +// PuffinReader +// +// Usage: +// +// r, err := puffin.NewPuffinReader(file, fileSize) +// if err != nil { +// return err +// } +// footer, err := r.ReadFooter() +// if err != nil { +// return err +// } +// for i := range footer.Blobs { +// blob, err := r.ReadBlob(i) +// // process blob.Data +// } +type PuffinReader struct { + r io.ReaderAt + size int64 + footer *Footer + footerStart int64 // cached after ReadFooter + maxBlobSize int64 +} + +// BlobData pairs a blob's metadata with its content. +type BlobData struct { + Metadata BlobMetadata + Data []byte +} + +// ReaderOption configures a PuffinReader. +type ReaderOption func(*PuffinReader) + +// WithMaxBlobSize sets the maximum blob size allowed when reading. +// This prevents OOM attacks from malicious files with huge blob lengths. +// Default is DefaultMaxBlobSize (256 MB). +func WithMaxBlobSize(size int64) ReaderOption { + return func(r *PuffinReader) { + r.maxBlobSize = size + } +} + +// NewPuffinReader creates a new Puffin reader. +// It validates both the header and trailing magic bytes upfront. +// The caller is responsible for closing the underlying io.ReaderAt. +func NewPuffinReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*PuffinReader, error) { + if r == nil { + return nil, errors.New("puffin: reader is nil") + } + + // Minimum size: header magic + footer magic + footer trailer + // [Magic] + zero for blob + [Magic] + [FooterPayloadSize (assuming ~0)] + [Flags] + [Magic] + minSize := int64(MagicSize + MagicSize + footerTrailerSize) + if size < minSize { + return nil, fmt.Errorf("puffin: file too small (%d bytes, minimum %d)", size, minSize) + } + + // Validate header magic + var headerMagic [MagicSize]byte + if _, err := r.ReadAt(headerMagic[:], 0); err != nil { + return nil, fmt.Errorf("puffin: read header magic: %w", err) + } + if headerMagic != magic { + return nil, errors.New("puffin: invalid header magic") + } + + // Validate trailing magic (fail fast on corrupt/truncated files) + var trailingMagic [MagicSize]byte + if _, err := r.ReadAt(trailingMagic[:], size-MagicSize); err != nil { + return nil, fmt.Errorf("puffin: read trailing magic: %w", err) + } + if trailingMagic != magic { + return nil, errors.New("puffin: invalid trailing magic") + } + + pr := &PuffinReader{ + r: r, + size: size, + maxBlobSize: DefaultMaxBlobSize, + } + + for _, opt := range opts { + opt(pr) + } + + return pr, nil +} + +// ReadFooter reads and parses the footer from the Puffin file. +// The footer is cached after the first successful read. +func (r *PuffinReader) ReadFooter() (*Footer, error) { + if r.footer != nil { + return r.footer, nil + } + + // Read trailer (last 12 bytes): PayloadSize(4) + Flags(4) + Magic(4) + var trailer [footerTrailerSize]byte + if _, err := r.r.ReadAt(trailer[:], r.size-footerTrailerSize); err != nil { + return nil, fmt.Errorf("puffin: read footer trailer: %w", err) + } + + // Validate trailing magic (already checked in constructor, but be defensive) + if trailer[8] != magic[0] || trailer[9] != magic[1] || + trailer[10] != magic[2] || trailer[11] != magic[3] { + return nil, errors.New("puffin: invalid trailing magic in footer") + } + + // Extract payload size and flags + payloadSize := int64(binary.LittleEndian.Uint32(trailer[0:4])) + flags := binary.LittleEndian.Uint32(trailer[4:8]) + + // Check for compressed footer (unsupported) + if flags&FooterFlagCompressed != 0 { + return nil, errors.New("puffin: compressed footer not supported") + } + + // Check for unknown flags (future-proofing) + if flags&^uint32(FooterFlagCompressed) != 0 { + return nil, fmt.Errorf("puffin: unknown footer flags set: 0x%x", flags) + } + + // Validate payload size + if payloadSize < 0 { + return nil, fmt.Errorf("puffin: invalid footer payload size %d", payloadSize) + } + + // Calculate footer start position + // Layout: [header magic (4)] [blobs...] [footer magic (4)] [JSON (payloadSize)] [trailer (12)] + footerStart := r.size - footerTrailerSize - payloadSize - MagicSize + if footerStart < MagicSize { + return nil, fmt.Errorf("puffin: footer payload size %d exceeds available space", payloadSize) + } + + // Validate footer start magic + var footerMagic [MagicSize]byte + if _, err := r.r.ReadAt(footerMagic[:], footerStart); err != nil { + return nil, fmt.Errorf("puffin: read footer start magic: %w", err) + } + if footerMagic != magic { + return nil, errors.New("puffin: invalid footer start magic") + } + + // Read footer JSON payload + payload := make([]byte, payloadSize) + if _, err := r.r.ReadAt(payload, footerStart+MagicSize); err != nil { + return nil, fmt.Errorf("puffin: read footer payload: %w", err) + } + + // Parse footer JSON + var footer Footer + if err := json.Unmarshal(payload, &footer); err != nil { + return nil, fmt.Errorf("puffin: decode footer JSON: %w", err) + } + + // Validate blob metadata + if err := r.validateBlobs(footer.Blobs, footerStart); err != nil { + return nil, err + } + + // Cache footer and footerStart + r.footer = &footer + r.footerStart = footerStart + + return r.footer, nil +} + +// ReadBlob reads the content of a specific blob by index. +// The footer is read automatically if not already cached. +func (r *PuffinReader) ReadBlob(index int) (*BlobData, error) { + footer, err := r.ReadFooter() + if err != nil { + return nil, err + } + + if index < 0 || index >= len(footer.Blobs) { + return nil, fmt.Errorf("puffin: blob index %d out of range [0, %d)", index, len(footer.Blobs)) + } + + meta := footer.Blobs[index] + data, err := r.readBlobData(meta) + if err != nil { + return nil, err + } + + return &BlobData{Metadata: meta, Data: data}, nil +} + +// ReadBlobByMetadata reads a blob using its metadata directly. +// This is useful when you have metadata from an external source. +// The footer must be read first to establish bounds checking. +func (r *PuffinReader) ReadBlobByMetadata(meta BlobMetadata) ([]byte, error) { + if r.footer == nil { Review Comment: why do we automatically read the footer for `ReadBlob` but not for this? It would make more sense to just read the footer as part of `NewReader` ########## puffin/puffin_reader.go: ########## @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package puffin + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "sort" +) + +// PuffinReader +// +// Usage: +// +// r, err := puffin.NewPuffinReader(file, fileSize) +// if err != nil { +// return err +// } +// footer, err := r.ReadFooter() +// if err != nil { +// return err +// } +// for i := range footer.Blobs { +// blob, err := r.ReadBlob(i) +// // process blob.Data +// } +type PuffinReader struct { + r io.ReaderAt + size int64 + footer *Footer + footerStart int64 // cached after ReadFooter + maxBlobSize int64 +} + +// BlobData pairs a blob's metadata with its content. +type BlobData struct { + Metadata BlobMetadata + Data []byte +} + +// ReaderOption configures a PuffinReader. +type ReaderOption func(*PuffinReader) + +// WithMaxBlobSize sets the maximum blob size allowed when reading. +// This prevents OOM attacks from malicious files with huge blob lengths. +// Default is DefaultMaxBlobSize (256 MB). +func WithMaxBlobSize(size int64) ReaderOption { + return func(r *PuffinReader) { + r.maxBlobSize = size + } +} + +// NewPuffinReader creates a new Puffin reader. +// It validates both the header and trailing magic bytes upfront. +// The caller is responsible for closing the underlying io.ReaderAt. +func NewPuffinReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*PuffinReader, error) { + if r == nil { + return nil, errors.New("puffin: reader is nil") + } + + // Minimum size: header magic + footer magic + footer trailer + // [Magic] + zero for blob + [Magic] + [FooterPayloadSize (assuming ~0)] + [Flags] + [Magic] + minSize := int64(MagicSize + MagicSize + footerTrailerSize) + if size < minSize { + return nil, fmt.Errorf("puffin: file too small (%d bytes, minimum %d)", size, minSize) + } + + // Validate header magic + var headerMagic [MagicSize]byte + if _, err := r.ReadAt(headerMagic[:], 0); err != nil { + return nil, fmt.Errorf("puffin: read header magic: %w", err) + } + if headerMagic != magic { + return nil, errors.New("puffin: invalid header magic") + } + + // Validate trailing magic (fail fast on corrupt/truncated files) + var trailingMagic [MagicSize]byte + if _, err := r.ReadAt(trailingMagic[:], size-MagicSize); err != nil { + return nil, fmt.Errorf("puffin: read trailing magic: %w", err) + } + if trailingMagic != magic { + return nil, errors.New("puffin: invalid trailing magic") + } + + pr := &PuffinReader{ + r: r, + size: size, + maxBlobSize: DefaultMaxBlobSize, + } + + for _, opt := range opts { + opt(pr) + } + + return pr, nil +} + +// ReadFooter reads and parses the footer from the Puffin file. +// The footer is cached after the first successful read. +func (r *PuffinReader) ReadFooter() (*Footer, error) { + if r.footer != nil { + return r.footer, nil + } + + // Read trailer (last 12 bytes): PayloadSize(4) + Flags(4) + Magic(4) + var trailer [footerTrailerSize]byte + if _, err := r.r.ReadAt(trailer[:], r.size-footerTrailerSize); err != nil { + return nil, fmt.Errorf("puffin: read footer trailer: %w", err) + } + + // Validate trailing magic (already checked in constructor, but be defensive) + if trailer[8] != magic[0] || trailer[9] != magic[1] || + trailer[10] != magic[2] || trailer[11] != magic[3] { + return nil, errors.New("puffin: invalid trailing magic in footer") + } + + // Extract payload size and flags + payloadSize := int64(binary.LittleEndian.Uint32(trailer[0:4])) + flags := binary.LittleEndian.Uint32(trailer[4:8]) + + // Check for compressed footer (unsupported) + if flags&FooterFlagCompressed != 0 { + return nil, errors.New("puffin: compressed footer not supported") + } + + // Check for unknown flags (future-proofing) + if flags&^uint32(FooterFlagCompressed) != 0 { + return nil, fmt.Errorf("puffin: unknown footer flags set: 0x%x", flags) + } + + // Validate payload size + if payloadSize < 0 { + return nil, fmt.Errorf("puffin: invalid footer payload size %d", payloadSize) + } + + // Calculate footer start position + // Layout: [header magic (4)] [blobs...] [footer magic (4)] [JSON (payloadSize)] [trailer (12)] + footerStart := r.size - footerTrailerSize - payloadSize - MagicSize + if footerStart < MagicSize { + return nil, fmt.Errorf("puffin: footer payload size %d exceeds available space", payloadSize) + } + + // Validate footer start magic + var footerMagic [MagicSize]byte + if _, err := r.r.ReadAt(footerMagic[:], footerStart); err != nil { + return nil, fmt.Errorf("puffin: read footer start magic: %w", err) + } + if footerMagic != magic { + return nil, errors.New("puffin: invalid footer start magic") + } + + // Read footer JSON payload + payload := make([]byte, payloadSize) + if _, err := r.r.ReadAt(payload, footerStart+MagicSize); err != nil { + return nil, fmt.Errorf("puffin: read footer payload: %w", err) + } + + // Parse footer JSON + var footer Footer + if err := json.Unmarshal(payload, &footer); err != nil { + return nil, fmt.Errorf("puffin: decode footer JSON: %w", err) + } + + // Validate blob metadata + if err := r.validateBlobs(footer.Blobs, footerStart); err != nil { + return nil, err + } + + // Cache footer and footerStart + r.footer = &footer + r.footerStart = footerStart + + return r.footer, nil +} + +// ReadBlob reads the content of a specific blob by index. +// The footer is read automatically if not already cached. +func (r *PuffinReader) ReadBlob(index int) (*BlobData, error) { + footer, err := r.ReadFooter() + if err != nil { + return nil, err + } + + if index < 0 || index >= len(footer.Blobs) { + return nil, fmt.Errorf("puffin: blob index %d out of range [0, %d)", index, len(footer.Blobs)) + } + + meta := footer.Blobs[index] + data, err := r.readBlobData(meta) + if err != nil { + return nil, err + } + + return &BlobData{Metadata: meta, Data: data}, nil +} + +// ReadBlobByMetadata reads a blob using its metadata directly. +// This is useful when you have metadata from an external source. +// The footer must be read first to establish bounds checking. +func (r *PuffinReader) ReadBlobByMetadata(meta BlobMetadata) ([]byte, error) { + if r.footer == nil { + return nil, errors.New("puffin: cannot read blob: footer not read") + } + + return r.readBlobData(meta) +} + +// readBlobData is the internal implementation for reading blob data. +func (r *PuffinReader) readBlobData(meta BlobMetadata) ([]byte, error) { + // Validate blob type + if meta.Type == "" { + return nil, errors.New("puffin: cannot read blob: type is required") + } + + // Check for compressed blob (unsupported) + if meta.CompressionCodec != nil && *meta.CompressionCodec != "" { + return nil, fmt.Errorf("puffin: cannot read blob: compression %q not supported", *meta.CompressionCodec) + } + + // Validate length + if meta.Length < 0 { + return nil, fmt.Errorf("puffin: invalid blob length %d", meta.Length) + } + if meta.Length > r.maxBlobSize { + return nil, fmt.Errorf("puffin: blob size %d exceeds limit %d", meta.Length, r.maxBlobSize) + } + + // Validate offset (must be after header magic) + if meta.Offset < MagicSize { + return nil, fmt.Errorf("puffin: invalid blob offset %d (before header)", meta.Offset) + } + + // Check for overflow + end := meta.Offset + meta.Length + if end < meta.Offset { + return nil, fmt.Errorf("puffin: blob offset+length overflow: offset=%d length=%d", meta.Offset, meta.Length) + } + + // Validate blob doesn't extend into footer + if end > r.footerStart { + return nil, fmt.Errorf("puffin: blob extends into footer: offset=%d length=%d footerStart=%d", + meta.Offset, meta.Length, r.footerStart) + } + + // Read blob data + data := make([]byte, meta.Length) + if _, err := r.r.ReadAt(data, meta.Offset); err != nil { + return nil, fmt.Errorf("puffin: read blob data: %w", err) + } + + return data, nil +} + +// ReadAllBlobs reads all blobs from the file. +// Blobs are read in offset order for sequential I/O efficiency, +// but results are returned in the original footer order. +func (r *PuffinReader) ReadAllBlobs() ([]*BlobData, error) { + footer, err := r.ReadFooter() + if err != nil { + return nil, err + } + + if len(footer.Blobs) == 0 { + return nil, nil + } + + // Create index mapping to preserve original order + type indexedBlob struct { + index int + meta BlobMetadata + } + indexed := make([]indexedBlob, len(footer.Blobs)) + for i, meta := range footer.Blobs { + indexed[i] = indexedBlob{index: i, meta: meta} + } + + // Sort by offset for sequential I/O + sort.Slice(indexed, func(i, j int) bool { + return indexed[i].meta.Offset < indexed[j].meta.Offset + }) + + // Read blobs in offset order, store in original order + results := make([]*BlobData, len(footer.Blobs)) + for _, ib := range indexed { + data, err := r.readBlobData(ib.meta) + if err != nil { + return nil, fmt.Errorf("puffin: read blob %d: %w", ib.index, err) + } + results[ib.index] = &BlobData{Metadata: ib.meta, Data: data} + } + + return results, nil +} + +// ReadRange reads a raw byte range from the blob data region. +// The footer must be read first to establish bounds checking. +func (r *PuffinReader) ReadRange(offset, length int64) ([]byte, error) { Review Comment: what is the actual use of this function? When would a user want to use this? ########## puffin/puffin_reader.go: ########## @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package puffin + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "sort" +) + +// PuffinReader +// +// Usage: +// +// r, err := puffin.NewPuffinReader(file, fileSize) +// if err != nil { +// return err +// } +// footer, err := r.ReadFooter() +// if err != nil { +// return err +// } +// for i := range footer.Blobs { +// blob, err := r.ReadBlob(i) +// // process blob.Data +// } +type PuffinReader struct { + r io.ReaderAt + size int64 + footer *Footer + footerStart int64 // cached after ReadFooter + maxBlobSize int64 +} + +// BlobData pairs a blob's metadata with its content. +type BlobData struct { + Metadata BlobMetadata + Data []byte +} + +// ReaderOption configures a PuffinReader. +type ReaderOption func(*PuffinReader) + +// WithMaxBlobSize sets the maximum blob size allowed when reading. +// This prevents OOM attacks from malicious files with huge blob lengths. +// Default is DefaultMaxBlobSize (256 MB). +func WithMaxBlobSize(size int64) ReaderOption { + return func(r *PuffinReader) { + r.maxBlobSize = size + } +} + +// NewPuffinReader creates a new Puffin reader. +// It validates both the header and trailing magic bytes upfront. +// The caller is responsible for closing the underlying io.ReaderAt. +func NewPuffinReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*PuffinReader, error) { + if r == nil { + return nil, errors.New("puffin: reader is nil") + } + + // Minimum size: header magic + footer magic + footer trailer + // [Magic] + zero for blob + [Magic] + [FooterPayloadSize (assuming ~0)] + [Flags] + [Magic] + minSize := int64(MagicSize + MagicSize + footerTrailerSize) + if size < minSize { + return nil, fmt.Errorf("puffin: file too small (%d bytes, minimum %d)", size, minSize) + } + + // Validate header magic + var headerMagic [MagicSize]byte + if _, err := r.ReadAt(headerMagic[:], 0); err != nil { + return nil, fmt.Errorf("puffin: read header magic: %w", err) + } + if headerMagic != magic { + return nil, errors.New("puffin: invalid header magic") + } + + // Validate trailing magic (fail fast on corrupt/truncated files) + var trailingMagic [MagicSize]byte + if _, err := r.ReadAt(trailingMagic[:], size-MagicSize); err != nil { + return nil, fmt.Errorf("puffin: read trailing magic: %w", err) + } + if trailingMagic != magic { + return nil, errors.New("puffin: invalid trailing magic") + } + + pr := &PuffinReader{ + r: r, + size: size, + maxBlobSize: DefaultMaxBlobSize, + } + + for _, opt := range opts { + opt(pr) + } + + return pr, nil +} + +// ReadFooter reads and parses the footer from the Puffin file. +// The footer is cached after the first successful read. +func (r *PuffinReader) ReadFooter() (*Footer, error) { + if r.footer != nil { + return r.footer, nil + } + + // Read trailer (last 12 bytes): PayloadSize(4) + Flags(4) + Magic(4) + var trailer [footerTrailerSize]byte + if _, err := r.r.ReadAt(trailer[:], r.size-footerTrailerSize); err != nil { + return nil, fmt.Errorf("puffin: read footer trailer: %w", err) + } + + // Validate trailing magic (already checked in constructor, but be defensive) + if trailer[8] != magic[0] || trailer[9] != magic[1] || + trailer[10] != magic[2] || trailer[11] != magic[3] { + return nil, errors.New("puffin: invalid trailing magic in footer") + } + + // Extract payload size and flags + payloadSize := int64(binary.LittleEndian.Uint32(trailer[0:4])) + flags := binary.LittleEndian.Uint32(trailer[4:8]) + + // Check for compressed footer (unsupported) + if flags&FooterFlagCompressed != 0 { + return nil, errors.New("puffin: compressed footer not supported") + } + + // Check for unknown flags (future-proofing) + if flags&^uint32(FooterFlagCompressed) != 0 { + return nil, fmt.Errorf("puffin: unknown footer flags set: 0x%x", flags) + } + + // Validate payload size + if payloadSize < 0 { + return nil, fmt.Errorf("puffin: invalid footer payload size %d", payloadSize) + } + + // Calculate footer start position + // Layout: [header magic (4)] [blobs...] [footer magic (4)] [JSON (payloadSize)] [trailer (12)] + footerStart := r.size - footerTrailerSize - payloadSize - MagicSize + if footerStart < MagicSize { + return nil, fmt.Errorf("puffin: footer payload size %d exceeds available space", payloadSize) + } + + // Validate footer start magic + var footerMagic [MagicSize]byte + if _, err := r.r.ReadAt(footerMagic[:], footerStart); err != nil { + return nil, fmt.Errorf("puffin: read footer start magic: %w", err) + } + if footerMagic != magic { + return nil, errors.New("puffin: invalid footer start magic") + } + + // Read footer JSON payload + payload := make([]byte, payloadSize) + if _, err := r.r.ReadAt(payload, footerStart+MagicSize); err != nil { + return nil, fmt.Errorf("puffin: read footer payload: %w", err) + } + + // Parse footer JSON + var footer Footer + if err := json.Unmarshal(payload, &footer); err != nil { + return nil, fmt.Errorf("puffin: decode footer JSON: %w", err) + } + + // Validate blob metadata + if err := r.validateBlobs(footer.Blobs, footerStart); err != nil { + return nil, err + } + + // Cache footer and footerStart + r.footer = &footer + r.footerStart = footerStart + + return r.footer, nil +} + +// ReadBlob reads the content of a specific blob by index. +// The footer is read automatically if not already cached. +func (r *PuffinReader) ReadBlob(index int) (*BlobData, error) { + footer, err := r.ReadFooter() + if err != nil { + return nil, err + } + + if index < 0 || index >= len(footer.Blobs) { + return nil, fmt.Errorf("puffin: blob index %d out of range [0, %d)", index, len(footer.Blobs)) + } + + meta := footer.Blobs[index] + data, err := r.readBlobData(meta) + if err != nil { + return nil, err + } + + return &BlobData{Metadata: meta, Data: data}, nil +} + +// ReadBlobByMetadata reads a blob using its metadata directly. +// This is useful when you have metadata from an external source. +// The footer must be read first to establish bounds checking. +func (r *PuffinReader) ReadBlobByMetadata(meta BlobMetadata) ([]byte, error) { + if r.footer == nil { + return nil, errors.New("puffin: cannot read blob: footer not read") + } + + return r.readBlobData(meta) +} + +// readBlobData is the internal implementation for reading blob data. +func (r *PuffinReader) readBlobData(meta BlobMetadata) ([]byte, error) { + // Validate blob type + if meta.Type == "" { + return nil, errors.New("puffin: cannot read blob: type is required") + } + + // Check for compressed blob (unsupported) + if meta.CompressionCodec != nil && *meta.CompressionCodec != "" { + return nil, fmt.Errorf("puffin: cannot read blob: compression %q not supported", *meta.CompressionCodec) + } + + // Validate length + if meta.Length < 0 { + return nil, fmt.Errorf("puffin: invalid blob length %d", meta.Length) + } + if meta.Length > r.maxBlobSize { + return nil, fmt.Errorf("puffin: blob size %d exceeds limit %d", meta.Length, r.maxBlobSize) + } + + // Validate offset (must be after header magic) + if meta.Offset < MagicSize { + return nil, fmt.Errorf("puffin: invalid blob offset %d (before header)", meta.Offset) + } + + // Check for overflow + end := meta.Offset + meta.Length + if end < meta.Offset { + return nil, fmt.Errorf("puffin: blob offset+length overflow: offset=%d length=%d", meta.Offset, meta.Length) + } + + // Validate blob doesn't extend into footer + if end > r.footerStart { + return nil, fmt.Errorf("puffin: blob extends into footer: offset=%d length=%d footerStart=%d", + meta.Offset, meta.Length, r.footerStart) + } + + // Read blob data + data := make([]byte, meta.Length) + if _, err := r.r.ReadAt(data, meta.Offset); err != nil { + return nil, fmt.Errorf("puffin: read blob data: %w", err) + } + + return data, nil +} + +// ReadAllBlobs reads all blobs from the file. +// Blobs are read in offset order for sequential I/O efficiency, +// but results are returned in the original footer order. +func (r *PuffinReader) ReadAllBlobs() ([]*BlobData, error) { + footer, err := r.ReadFooter() + if err != nil { + return nil, err + } + + if len(footer.Blobs) == 0 { + return nil, nil + } + + // Create index mapping to preserve original order + type indexedBlob struct { + index int + meta BlobMetadata + } + indexed := make([]indexedBlob, len(footer.Blobs)) + for i, meta := range footer.Blobs { + indexed[i] = indexedBlob{index: i, meta: meta} + } + + // Sort by offset for sequential I/O + sort.Slice(indexed, func(i, j int) bool { + return indexed[i].meta.Offset < indexed[j].meta.Offset + }) + + // Read blobs in offset order, store in original order + results := make([]*BlobData, len(footer.Blobs)) + for _, ib := range indexed { + data, err := r.readBlobData(ib.meta) + if err != nil { + return nil, fmt.Errorf("puffin: read blob %d: %w", ib.index, err) + } + results[ib.index] = &BlobData{Metadata: ib.meta, Data: data} + } + + return results, nil +} + +// ReadRange reads a raw byte range from the blob data region. +// The footer must be read first to establish bounds checking. +func (r *PuffinReader) ReadRange(offset, length int64) ([]byte, error) { + if r.footer == nil { + return nil, errors.New("puffin: cannot read range: footer not read") + } Review Comment: same comment as above ########## puffin/puffin_reader.go: ########## @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package puffin + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "sort" +) + +// PuffinReader +// +// Usage: +// +// r, err := puffin.NewPuffinReader(file, fileSize) +// if err != nil { +// return err +// } +// footer, err := r.ReadFooter() +// if err != nil { +// return err +// } +// for i := range footer.Blobs { +// blob, err := r.ReadBlob(i) +// // process blob.Data +// } +type PuffinReader struct { + r io.ReaderAt + size int64 + footer *Footer + footerStart int64 // cached after ReadFooter + maxBlobSize int64 +} + +// BlobData pairs a blob's metadata with its content. +type BlobData struct { + Metadata BlobMetadata + Data []byte +} + +// ReaderOption configures a PuffinReader. +type ReaderOption func(*PuffinReader) + +// WithMaxBlobSize sets the maximum blob size allowed when reading. +// This prevents OOM attacks from malicious files with huge blob lengths. +// Default is DefaultMaxBlobSize (256 MB). +func WithMaxBlobSize(size int64) ReaderOption { + return func(r *PuffinReader) { + r.maxBlobSize = size + } +} + +// NewPuffinReader creates a new Puffin reader. +// It validates both the header and trailing magic bytes upfront. +// The caller is responsible for closing the underlying io.ReaderAt. +func NewPuffinReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*PuffinReader, error) { + if r == nil { + return nil, errors.New("puffin: reader is nil") + } + + // Minimum size: header magic + footer magic + footer trailer + // [Magic] + zero for blob + [Magic] + [FooterPayloadSize (assuming ~0)] + [Flags] + [Magic] + minSize := int64(MagicSize + MagicSize + footerTrailerSize) + if size < minSize { + return nil, fmt.Errorf("puffin: file too small (%d bytes, minimum %d)", size, minSize) + } + + // Validate header magic + var headerMagic [MagicSize]byte + if _, err := r.ReadAt(headerMagic[:], 0); err != nil { + return nil, fmt.Errorf("puffin: read header magic: %w", err) + } + if headerMagic != magic { + return nil, errors.New("puffin: invalid header magic") + } + + // Validate trailing magic (fail fast on corrupt/truncated files) + var trailingMagic [MagicSize]byte + if _, err := r.ReadAt(trailingMagic[:], size-MagicSize); err != nil { + return nil, fmt.Errorf("puffin: read trailing magic: %w", err) + } + if trailingMagic != magic { + return nil, errors.New("puffin: invalid trailing magic") + } + + pr := &PuffinReader{ + r: r, + size: size, + maxBlobSize: DefaultMaxBlobSize, + } + + for _, opt := range opts { + opt(pr) + } + + return pr, nil +} + +// ReadFooter reads and parses the footer from the Puffin file. +// The footer is cached after the first successful read. +func (r *PuffinReader) ReadFooter() (*Footer, error) { + if r.footer != nil { + return r.footer, nil + } + + // Read trailer (last 12 bytes): PayloadSize(4) + Flags(4) + Magic(4) + var trailer [footerTrailerSize]byte + if _, err := r.r.ReadAt(trailer[:], r.size-footerTrailerSize); err != nil { + return nil, fmt.Errorf("puffin: read footer trailer: %w", err) + } + + // Validate trailing magic (already checked in constructor, but be defensive) + if trailer[8] != magic[0] || trailer[9] != magic[1] || + trailer[10] != magic[2] || trailer[11] != magic[3] { + return nil, errors.New("puffin: invalid trailing magic in footer") + } + + // Extract payload size and flags + payloadSize := int64(binary.LittleEndian.Uint32(trailer[0:4])) + flags := binary.LittleEndian.Uint32(trailer[4:8]) + + // Check for compressed footer (unsupported) + if flags&FooterFlagCompressed != 0 { + return nil, errors.New("puffin: compressed footer not supported") + } + + // Check for unknown flags (future-proofing) + if flags&^uint32(FooterFlagCompressed) != 0 { + return nil, fmt.Errorf("puffin: unknown footer flags set: 0x%x", flags) + } + + // Validate payload size + if payloadSize < 0 { + return nil, fmt.Errorf("puffin: invalid footer payload size %d", payloadSize) + } + + // Calculate footer start position + // Layout: [header magic (4)] [blobs...] [footer magic (4)] [JSON (payloadSize)] [trailer (12)] + footerStart := r.size - footerTrailerSize - payloadSize - MagicSize + if footerStart < MagicSize { + return nil, fmt.Errorf("puffin: footer payload size %d exceeds available space", payloadSize) + } + + // Validate footer start magic + var footerMagic [MagicSize]byte + if _, err := r.r.ReadAt(footerMagic[:], footerStart); err != nil { + return nil, fmt.Errorf("puffin: read footer start magic: %w", err) + } + if footerMagic != magic { + return nil, errors.New("puffin: invalid footer start magic") + } + + // Read footer JSON payload + payload := make([]byte, payloadSize) + if _, err := r.r.ReadAt(payload, footerStart+MagicSize); err != nil { + return nil, fmt.Errorf("puffin: read footer payload: %w", err) + } + + // Parse footer JSON + var footer Footer + if err := json.Unmarshal(payload, &footer); err != nil { + return nil, fmt.Errorf("puffin: decode footer JSON: %w", err) + } Review Comment: we don't need to copy the bytes, use a io.LimitedReader and a json.Decoder and just decode directly from the reader instead. Given that JSON can be large, avoiding the copy before we unmarshal could be a potential significant savings ########## puffin/puffin_writer.go: ########## @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package puffin + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "math" +) + +// writeAll writes all bytes to w or returns an error. +// Handles the io.Writer contract where Write can return n < len(data) without error. +func writeAll(w io.Writer, data []byte) error { + n, err := w.Write(data) + if err != nil { + return err + } + if n != len(data) { + return fmt.Errorf("short write: wrote %d of %d bytes", n, len(data)) + } + + return nil +} + +// PuffinWriter writes blobs and metadata to a Puffin file. +// +// The Puffin format stores arbitrary blobs (e.g., statistics, deletion vectors) +// along with JSON metadata in a footer. The writer handles the binary layout: +// header magic, blob data, and footer structure. +// +// Usage: +// +// w, err := puffin.NewWriter(file) +// if err != nil { +// return err +// } +// _, err = w.AddBlob(puffin.BlobMetadataInput{ +// Type: puffin.ApacheDataSketchesThetaV1, +// SnapshotID: 123, +// Fields: []int32{1}, +// }, sketchBytes) +// if err != nil { +// return err +// } +// return w.Finish() +type PuffinWriter struct { + w io.Writer + offset int64 + blobs []BlobMetadata + props map[string]string + done bool + createdBy string +} + +// BlobMetadataInput contains fields the caller provides when adding a blob. +// Offset, Length, and CompressionCodec are set by the writer. +type BlobMetadataInput struct { + Type string + SnapshotID int64 + SequenceNumber int64 + Fields []int32 + Properties map[string]string +} + +// NewWriter creates a new PuffinWriter and writes the file header magic. +// The caller is responsible for closing the underlying writer after Finish returns. +func NewWriter(w io.Writer) (*PuffinWriter, error) { + if w == nil { + return nil, errors.New("puffin: writer is nil") + } + + // Write header magic bytes + if err := writeAll(w, magic[:]); err != nil { + return nil, fmt.Errorf("puffin: write header magic: %w", err) + } + + return &PuffinWriter{ + w: w, + offset: MagicSize, + props: make(map[string]string), + createdBy: "iceberg-go", + }, nil +} + +// SetProperties merges the provided properties into the file-level properties +// written to the footer. Can be called multiple times before Finish. +func (w *PuffinWriter) SetProperties(props map[string]string) error { Review Comment: Since this is cumulative, maybe it should be renamed as "AddProperties" ? ########## puffin/puffin_reader.go: ########## @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package puffin + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "sort" +) + +// PuffinReader +// +// Usage: +// +// r, err := puffin.NewPuffinReader(file, fileSize) +// if err != nil { +// return err +// } +// footer, err := r.ReadFooter() +// if err != nil { +// return err +// } +// for i := range footer.Blobs { +// blob, err := r.ReadBlob(i) +// // process blob.Data +// } +type PuffinReader struct { + r io.ReaderAt + size int64 + footer *Footer + footerStart int64 // cached after ReadFooter + maxBlobSize int64 +} + +// BlobData pairs a blob's metadata with its content. +type BlobData struct { + Metadata BlobMetadata + Data []byte +} + +// ReaderOption configures a PuffinReader. +type ReaderOption func(*PuffinReader) + +// WithMaxBlobSize sets the maximum blob size allowed when reading. +// This prevents OOM attacks from malicious files with huge blob lengths. +// Default is DefaultMaxBlobSize (256 MB). +func WithMaxBlobSize(size int64) ReaderOption { + return func(r *PuffinReader) { + r.maxBlobSize = size + } +} + +// NewPuffinReader creates a new Puffin reader. +// It validates both the header and trailing magic bytes upfront. +// The caller is responsible for closing the underlying io.ReaderAt. +func NewPuffinReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*PuffinReader, error) { + if r == nil { + return nil, errors.New("puffin: reader is nil") + } + + // Minimum size: header magic + footer magic + footer trailer + // [Magic] + zero for blob + [Magic] + [FooterPayloadSize (assuming ~0)] + [Flags] + [Magic] + minSize := int64(MagicSize + MagicSize + footerTrailerSize) + if size < minSize { + return nil, fmt.Errorf("puffin: file too small (%d bytes, minimum %d)", size, minSize) + } + + // Validate header magic + var headerMagic [MagicSize]byte + if _, err := r.ReadAt(headerMagic[:], 0); err != nil { + return nil, fmt.Errorf("puffin: read header magic: %w", err) + } + if headerMagic != magic { + return nil, errors.New("puffin: invalid header magic") + } + + // Validate trailing magic (fail fast on corrupt/truncated files) + var trailingMagic [MagicSize]byte + if _, err := r.ReadAt(trailingMagic[:], size-MagicSize); err != nil { + return nil, fmt.Errorf("puffin: read trailing magic: %w", err) + } + if trailingMagic != magic { + return nil, errors.New("puffin: invalid trailing magic") + } + + pr := &PuffinReader{ + r: r, + size: size, + maxBlobSize: DefaultMaxBlobSize, + } + + for _, opt := range opts { + opt(pr) + } + + return pr, nil +} + +// ReadFooter reads and parses the footer from the Puffin file. +// The footer is cached after the first successful read. +func (r *PuffinReader) ReadFooter() (*Footer, error) { + if r.footer != nil { + return r.footer, nil + } + + // Read trailer (last 12 bytes): PayloadSize(4) + Flags(4) + Magic(4) + var trailer [footerTrailerSize]byte + if _, err := r.r.ReadAt(trailer[:], r.size-footerTrailerSize); err != nil { + return nil, fmt.Errorf("puffin: read footer trailer: %w", err) + } + + // Validate trailing magic (already checked in constructor, but be defensive) + if trailer[8] != magic[0] || trailer[9] != magic[1] || + trailer[10] != magic[2] || trailer[11] != magic[3] { + return nil, errors.New("puffin: invalid trailing magic in footer") + } + + // Extract payload size and flags + payloadSize := int64(binary.LittleEndian.Uint32(trailer[0:4])) + flags := binary.LittleEndian.Uint32(trailer[4:8]) + + // Check for compressed footer (unsupported) + if flags&FooterFlagCompressed != 0 { + return nil, errors.New("puffin: compressed footer not supported") + } + + // Check for unknown flags (future-proofing) + if flags&^uint32(FooterFlagCompressed) != 0 { + return nil, fmt.Errorf("puffin: unknown footer flags set: 0x%x", flags) + } + + // Validate payload size + if payloadSize < 0 { + return nil, fmt.Errorf("puffin: invalid footer payload size %d", payloadSize) + } + + // Calculate footer start position + // Layout: [header magic (4)] [blobs...] [footer magic (4)] [JSON (payloadSize)] [trailer (12)] + footerStart := r.size - footerTrailerSize - payloadSize - MagicSize + if footerStart < MagicSize { + return nil, fmt.Errorf("puffin: footer payload size %d exceeds available space", payloadSize) + } + + // Validate footer start magic + var footerMagic [MagicSize]byte + if _, err := r.r.ReadAt(footerMagic[:], footerStart); err != nil { + return nil, fmt.Errorf("puffin: read footer start magic: %w", err) + } + if footerMagic != magic { + return nil, errors.New("puffin: invalid footer start magic") + } + + // Read footer JSON payload + payload := make([]byte, payloadSize) + if _, err := r.r.ReadAt(payload, footerStart+MagicSize); err != nil { + return nil, fmt.Errorf("puffin: read footer payload: %w", err) + } + + // Parse footer JSON + var footer Footer + if err := json.Unmarshal(payload, &footer); err != nil { + return nil, fmt.Errorf("puffin: decode footer JSON: %w", err) + } + + // Validate blob metadata + if err := r.validateBlobs(footer.Blobs, footerStart); err != nil { + return nil, err + } + + // Cache footer and footerStart + r.footer = &footer + r.footerStart = footerStart + + return r.footer, nil +} + +// ReadBlob reads the content of a specific blob by index. +// The footer is read automatically if not already cached. +func (r *PuffinReader) ReadBlob(index int) (*BlobData, error) { + footer, err := r.ReadFooter() + if err != nil { + return nil, err + } + + if index < 0 || index >= len(footer.Blobs) { + return nil, fmt.Errorf("puffin: blob index %d out of range [0, %d)", index, len(footer.Blobs)) + } + + meta := footer.Blobs[index] + data, err := r.readBlobData(meta) + if err != nil { + return nil, err + } + + return &BlobData{Metadata: meta, Data: data}, nil +} + +// ReadBlobByMetadata reads a blob using its metadata directly. +// This is useful when you have metadata from an external source. +// The footer must be read first to establish bounds checking. +func (r *PuffinReader) ReadBlobByMetadata(meta BlobMetadata) ([]byte, error) { + if r.footer == nil { + return nil, errors.New("puffin: cannot read blob: footer not read") + } + + return r.readBlobData(meta) +} + +// readBlobData is the internal implementation for reading blob data. +func (r *PuffinReader) readBlobData(meta BlobMetadata) ([]byte, error) { + // Validate blob type + if meta.Type == "" { + return nil, errors.New("puffin: cannot read blob: type is required") + } + + // Check for compressed blob (unsupported) + if meta.CompressionCodec != nil && *meta.CompressionCodec != "" { + return nil, fmt.Errorf("puffin: cannot read blob: compression %q not supported", *meta.CompressionCodec) + } + + // Validate length + if meta.Length < 0 { + return nil, fmt.Errorf("puffin: invalid blob length %d", meta.Length) + } + if meta.Length > r.maxBlobSize { + return nil, fmt.Errorf("puffin: blob size %d exceeds limit %d", meta.Length, r.maxBlobSize) + } + + // Validate offset (must be after header magic) + if meta.Offset < MagicSize { + return nil, fmt.Errorf("puffin: invalid blob offset %d (before header)", meta.Offset) + } + + // Check for overflow + end := meta.Offset + meta.Length + if end < meta.Offset { + return nil, fmt.Errorf("puffin: blob offset+length overflow: offset=%d length=%d", meta.Offset, meta.Length) + } + + // Validate blob doesn't extend into footer + if end > r.footerStart { + return nil, fmt.Errorf("puffin: blob extends into footer: offset=%d length=%d footerStart=%d", + meta.Offset, meta.Length, r.footerStart) + } + + // Read blob data + data := make([]byte, meta.Length) + if _, err := r.r.ReadAt(data, meta.Offset); err != nil { + return nil, fmt.Errorf("puffin: read blob data: %w", err) + } + + return data, nil +} + +// ReadAllBlobs reads all blobs from the file. +// Blobs are read in offset order for sequential I/O efficiency, +// but results are returned in the original footer order. +func (r *PuffinReader) ReadAllBlobs() ([]*BlobData, error) { + footer, err := r.ReadFooter() + if err != nil { + return nil, err + } + + if len(footer.Blobs) == 0 { + return nil, nil + } + + // Create index mapping to preserve original order + type indexedBlob struct { + index int + meta BlobMetadata + } + indexed := make([]indexedBlob, len(footer.Blobs)) + for i, meta := range footer.Blobs { + indexed[i] = indexedBlob{index: i, meta: meta} + } + + // Sort by offset for sequential I/O + sort.Slice(indexed, func(i, j int) bool { + return indexed[i].meta.Offset < indexed[j].meta.Offset + }) + + // Read blobs in offset order, store in original order + results := make([]*BlobData, len(footer.Blobs)) + for _, ib := range indexed { + data, err := r.readBlobData(ib.meta) + if err != nil { + return nil, fmt.Errorf("puffin: read blob %d: %w", ib.index, err) + } + results[ib.index] = &BlobData{Metadata: ib.meta, Data: data} + } + + return results, nil +} + +// ReadRange reads a raw byte range from the blob data region. +// The footer must be read first to establish bounds checking. +func (r *PuffinReader) ReadRange(offset, length int64) ([]byte, error) { + if r.footer == nil { + return nil, errors.New("puffin: cannot read range: footer not read") + } + + // Validate length + if length < 0 { + return nil, fmt.Errorf("puffin: invalid range length %d", length) + } + if length > r.maxBlobSize { + return nil, fmt.Errorf("puffin: range size %d exceeds limit %d", length, r.maxBlobSize) + } + + // Validate offset + if offset < MagicSize { + return nil, fmt.Errorf("puffin: invalid range offset %d (before header)", offset) + } Review Comment: these checks are duplicated, as are the ones below. Maybe make this an internal method for reading the data in general? ########## puffin/puffin_reader.go: ########## @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package puffin + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "sort" +) + +// PuffinReader +// +// Usage: +// +// r, err := puffin.NewPuffinReader(file, fileSize) +// if err != nil { +// return err +// } +// footer, err := r.ReadFooter() +// if err != nil { +// return err +// } +// for i := range footer.Blobs { +// blob, err := r.ReadBlob(i) +// // process blob.Data +// } +type PuffinReader struct { + r io.ReaderAt + size int64 + footer *Footer + footerStart int64 // cached after ReadFooter + maxBlobSize int64 +} + +// BlobData pairs a blob's metadata with its content. +type BlobData struct { + Metadata BlobMetadata + Data []byte +} + +// ReaderOption configures a PuffinReader. +type ReaderOption func(*PuffinReader) + +// WithMaxBlobSize sets the maximum blob size allowed when reading. +// This prevents OOM attacks from malicious files with huge blob lengths. +// Default is DefaultMaxBlobSize (256 MB). +func WithMaxBlobSize(size int64) ReaderOption { + return func(r *PuffinReader) { + r.maxBlobSize = size + } +} + +// NewPuffinReader creates a new Puffin reader. +// It validates both the header and trailing magic bytes upfront. +// The caller is responsible for closing the underlying io.ReaderAt. +func NewPuffinReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*PuffinReader, error) { + if r == nil { + return nil, errors.New("puffin: reader is nil") + } + + // Minimum size: header magic + footer magic + footer trailer + // [Magic] + zero for blob + [Magic] + [FooterPayloadSize (assuming ~0)] + [Flags] + [Magic] + minSize := int64(MagicSize + MagicSize + footerTrailerSize) + if size < minSize { + return nil, fmt.Errorf("puffin: file too small (%d bytes, minimum %d)", size, minSize) + } + + // Validate header magic + var headerMagic [MagicSize]byte + if _, err := r.ReadAt(headerMagic[:], 0); err != nil { + return nil, fmt.Errorf("puffin: read header magic: %w", err) + } + if headerMagic != magic { + return nil, errors.New("puffin: invalid header magic") + } + + // Validate trailing magic (fail fast on corrupt/truncated files) + var trailingMagic [MagicSize]byte + if _, err := r.ReadAt(trailingMagic[:], size-MagicSize); err != nil { + return nil, fmt.Errorf("puffin: read trailing magic: %w", err) + } + if trailingMagic != magic { + return nil, errors.New("puffin: invalid trailing magic") + } + + pr := &PuffinReader{ + r: r, + size: size, + maxBlobSize: DefaultMaxBlobSize, + } + + for _, opt := range opts { + opt(pr) + } + + return pr, nil +} + +// ReadFooter reads and parses the footer from the Puffin file. +// The footer is cached after the first successful read. +func (r *PuffinReader) ReadFooter() (*Footer, error) { + if r.footer != nil { + return r.footer, nil + } + + // Read trailer (last 12 bytes): PayloadSize(4) + Flags(4) + Magic(4) + var trailer [footerTrailerSize]byte + if _, err := r.r.ReadAt(trailer[:], r.size-footerTrailerSize); err != nil { + return nil, fmt.Errorf("puffin: read footer trailer: %w", err) + } + + // Validate trailing magic (already checked in constructor, but be defensive) + if trailer[8] != magic[0] || trailer[9] != magic[1] || + trailer[10] != magic[2] || trailer[11] != magic[3] { + return nil, errors.New("puffin: invalid trailing magic in footer") + } + + // Extract payload size and flags + payloadSize := int64(binary.LittleEndian.Uint32(trailer[0:4])) + flags := binary.LittleEndian.Uint32(trailer[4:8]) + + // Check for compressed footer (unsupported) + if flags&FooterFlagCompressed != 0 { + return nil, errors.New("puffin: compressed footer not supported") + } + + // Check for unknown flags (future-proofing) + if flags&^uint32(FooterFlagCompressed) != 0 { + return nil, fmt.Errorf("puffin: unknown footer flags set: 0x%x", flags) + } Review Comment: do we want to error on these? Or just ignore unknown flags? ########## puffin/puffin_reader.go: ########## @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package puffin + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "sort" +) + +// PuffinReader +// +// Usage: +// +// r, err := puffin.NewPuffinReader(file, fileSize) +// if err != nil { +// return err +// } +// footer, err := r.ReadFooter() +// if err != nil { +// return err +// } +// for i := range footer.Blobs { +// blob, err := r.ReadBlob(i) +// // process blob.Data +// } +type PuffinReader struct { + r io.ReaderAt + size int64 + footer *Footer + footerStart int64 // cached after ReadFooter + maxBlobSize int64 +} + +// BlobData pairs a blob's metadata with its content. +type BlobData struct { + Metadata BlobMetadata + Data []byte +} + +// ReaderOption configures a PuffinReader. +type ReaderOption func(*PuffinReader) + +// WithMaxBlobSize sets the maximum blob size allowed when reading. +// This prevents OOM attacks from malicious files with huge blob lengths. +// Default is DefaultMaxBlobSize (256 MB). +func WithMaxBlobSize(size int64) ReaderOption { + return func(r *PuffinReader) { + r.maxBlobSize = size + } +} + +// NewPuffinReader creates a new Puffin reader. +// It validates both the header and trailing magic bytes upfront. +// The caller is responsible for closing the underlying io.ReaderAt. +func NewPuffinReader(r io.ReaderAt, size int64, opts ...ReaderOption) (*PuffinReader, error) { + if r == nil { + return nil, errors.New("puffin: reader is nil") + } + + // Minimum size: header magic + footer magic + footer trailer + // [Magic] + zero for blob + [Magic] + [FooterPayloadSize (assuming ~0)] + [Flags] + [Magic] + minSize := int64(MagicSize + MagicSize + footerTrailerSize) Review Comment: the minimum size has MagicSize * 3, you're missing one in here :smile: ########## puffin/puffin_writer.go: ########## @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package puffin + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "math" +) + +// writeAll writes all bytes to w or returns an error. +// Handles the io.Writer contract where Write can return n < len(data) without error. +func writeAll(w io.Writer, data []byte) error { + n, err := w.Write(data) + if err != nil { + return err + } + if n != len(data) { + return fmt.Errorf("short write: wrote %d of %d bytes", n, len(data)) + } + + return nil +} + +// PuffinWriter writes blobs and metadata to a Puffin file. +// +// The Puffin format stores arbitrary blobs (e.g., statistics, deletion vectors) +// along with JSON metadata in a footer. The writer handles the binary layout: +// header magic, blob data, and footer structure. +// +// Usage: +// +// w, err := puffin.NewWriter(file) +// if err != nil { +// return err +// } +// _, err = w.AddBlob(puffin.BlobMetadataInput{ +// Type: puffin.ApacheDataSketchesThetaV1, +// SnapshotID: 123, +// Fields: []int32{1}, +// }, sketchBytes) +// if err != nil { +// return err +// } +// return w.Finish() +type PuffinWriter struct { + w io.Writer + offset int64 + blobs []BlobMetadata + props map[string]string + done bool + createdBy string +} + +// BlobMetadataInput contains fields the caller provides when adding a blob. +// Offset, Length, and CompressionCodec are set by the writer. +type BlobMetadataInput struct { + Type string + SnapshotID int64 + SequenceNumber int64 + Fields []int32 + Properties map[string]string +} + +// NewWriter creates a new PuffinWriter and writes the file header magic. +// The caller is responsible for closing the underlying writer after Finish returns. +func NewWriter(w io.Writer) (*PuffinWriter, error) { + if w == nil { + return nil, errors.New("puffin: writer is nil") + } + + // Write header magic bytes + if err := writeAll(w, magic[:]); err != nil { + return nil, fmt.Errorf("puffin: write header magic: %w", err) + } + + return &PuffinWriter{ + w: w, + offset: MagicSize, + props: make(map[string]string), + createdBy: "iceberg-go", + }, nil +} + +// SetProperties merges the provided properties into the file-level properties +// written to the footer. Can be called multiple times before Finish. +func (w *PuffinWriter) SetProperties(props map[string]string) error { + if w.done { + return errors.New("puffin: cannot set properties: writer already finalized") + } + for k, v := range props { + w.props[k] = v + } + + return nil +} + +// SetCreatedBy overrides the default "created-by" property written to the footer. +// The default value is "iceberg-go". Example: "MyApp version 1.2.3". +func (w *PuffinWriter) SetCreatedBy(createdBy string) error { + if w.done { + return errors.New("puffin: cannot set created-by: writer already finalized") + } + if createdBy == "" { + return errors.New("puffin: cannot set created-by: value cannot be empty") + } + w.createdBy = createdBy + + return nil +} + +// AddBlob writes blob data and records its metadata for the footer. +// Returns the complete BlobMetadata including the computed Offset and Length. +// The input.Type is required; use constants like ApacheDataSketchesThetaV1. +func (w *PuffinWriter) AddBlob(input BlobMetadataInput, data []byte) (BlobMetadata, error) { + if w.done { + return BlobMetadata{}, errors.New("puffin: cannot add blob: writer already finalized") + } + if input.Type == "" { + return BlobMetadata{}, errors.New("puffin: cannot add blob: type is required") + } + + meta := BlobMetadata{ + Type: input.Type, + SnapshotID: input.SnapshotID, + SequenceNumber: input.SequenceNumber, + Fields: input.Fields, + Offset: w.offset, + Length: int64(len(data)), + Properties: input.Properties, + } + + if err := writeAll(w.w, data); err != nil { + return BlobMetadata{}, fmt.Errorf("puffin: write blob: %w", err) + } + + w.offset += meta.Length + w.blobs = append(w.blobs, meta) + + return meta, nil +} + +// Finish writes the footer and completes the Puffin file structure. +// Must be called exactly once after all blobs are written. +// After Finish returns, no further operations are allowed on the writer. +func (w *PuffinWriter) Finish() error { + if w.done { + return errors.New("puffin: cannot finish: writer already finalized") + } + + // Build footer + footer := Footer{ + Blobs: w.blobs, + Properties: w.props, + } + if footer.Properties == nil { + footer.Properties = make(map[string]string) + } + if w.createdBy != "" { + footer.Properties[CreatedBy] = w.createdBy + } + + payload, err := json.Marshal(footer) + if err != nil { + return fmt.Errorf("puffin: marshal footer: %w", err) + } + + // Check footer size fits in int32 + if len(payload) > math.MaxInt32 { + return fmt.Errorf("puffin: footer too large: %d bytes exceeds 2GB limit", len(payload)) + } + + // Write footer start magic + if err := writeAll(w.w, magic[:]); err != nil { + return fmt.Errorf("puffin: write footer magic: %w", err) + } + + // Write footer payload + if err := writeAll(w.w, payload); err != nil { + return fmt.Errorf("puffin: write footer payload: %w", err) + } + + // Write trailer: PayloadSize(4) + Flags(4) + Magic(4) + var trailer [footerTrailerSize]byte + binary.LittleEndian.PutUint32(trailer[0:4], uint32(len(payload))) + binary.LittleEndian.PutUint32(trailer[4:8], 0) // flags = 0 (uncompressed) Review Comment: do we want to support compression? Or do that in a follow-up? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
