This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 95c4e4d  feat(arrow/ipc): implement lazy loading/zero-copy for IPC 
files (#216)
95c4e4d is described below

commit 95c4e4d1297b8413cbf208a435088022586cdfc9
Author: Matt Topol <[email protected]>
AuthorDate: Wed Dec 18 14:57:45 2024 -0500

    feat(arrow/ipc): implement lazy loading/zero-copy for IPC files (#216)
    
    ### Rationale for this change
    closes #207
    
    ### What changes are included in this PR?
    Adding new method `NewMappedFileReader` to ipc package which accepts a
    byte slice instead of a `ReaderAtSeeker`. Updates `ipcSource` to
    reference the raw byte slices from the input directly instead of
    wrapping with `bytes.NewReader` which forces copies via `Read`,
    `ReadFull`, etc.
    
    ### Are these changes tested?
    Unit tests added to confirm that the pointers match and that we aren't
    allocating unnecessarily.
    
    ### Are there any user-facing changes?
    Shouldn't be any user-facing changes other than a reduction in memory
    usage when reading non-compressed IPC data.
---
 arrow/ipc/file_reader.go   | 392 ++++++++++++++++++++++++++++++---------------
 arrow/ipc/file_writer.go   |   8 +-
 arrow/ipc/metadata.go      |  24 +--
 arrow/ipc/metadata_test.go |  24 +--
 arrow/ipc/reader.go        |   7 +-
 arrow/ipc/reader_test.go   |  74 +++++++++
 6 files changed, 369 insertions(+), 160 deletions(-)

diff --git a/arrow/ipc/file_reader.go b/arrow/ipc/file_reader.go
index 2715831..9135529 100644
--- a/arrow/ipc/file_reader.go
+++ b/arrow/ipc/file_reader.go
@@ -33,15 +33,127 @@ import (
        "github.com/apache/arrow-go/v18/arrow/memory"
 )
 
-// FileReader is an Arrow file reader.
-type FileReader struct {
+type readerImpl interface {
+       getFooterEnd() (int64, error)
+       getBytes(offset, length int64) ([]byte, error)
+       dict(memory.Allocator, *footerBlock, int) (dataBlock, error)
+       block(memory.Allocator, *footerBlock, int) (dataBlock, error)
+}
+
+type footerBlock struct {
+       offset int64
+       buffer *memory.Buffer
+       data   *flatbuf.Footer
+}
+
+type dataBlock interface {
+       Offset() int64
+       Meta() int32
+       Body() int64
+       NewMessage() (*Message, error)
+}
+
+const footerSizeLen = 4
+
+var minimumOffsetSize = int64(len(Magic)*2 + footerSizeLen)
+
+type basicReaderImpl struct {
        r ReadAtSeeker
+}
 
-       footer struct {
-               offset int64
-               buffer *memory.Buffer
-               data   *flatbuf.Footer
+func (r *basicReaderImpl) getBytes(offset, len int64) ([]byte, error) {
+       buf := make([]byte, len)
+       n, err := r.r.ReadAt(buf, offset)
+       if err != nil {
+               return nil, fmt.Errorf("arrow/ipc: could not read %d bytes at 
offset %d: %w", len, offset, err)
+       }
+       if int64(n) != len {
+               return nil, fmt.Errorf("arrow/ipc: could not read %d bytes at 
offset %d", len, offset)
        }
+       return buf, nil
+}
+
+func (r *basicReaderImpl) getFooterEnd() (int64, error) {
+       return r.r.Seek(0, io.SeekEnd)
+}
+
+func (r *basicReaderImpl) block(mem memory.Allocator, f *footerBlock, i int) 
(dataBlock, error) {
+       var blk flatbuf.Block
+       if !f.data.RecordBatches(&blk, i) {
+               return fileBlock{}, fmt.Errorf("arrow/ipc: could not extract 
file block %d", i)
+       }
+
+       return fileBlock{
+               offset: blk.Offset(),
+               meta:   blk.MetaDataLength(),
+               body:   blk.BodyLength(),
+               r:      r.r,
+               mem:    mem,
+       }, nil
+}
+
+func (r *basicReaderImpl) dict(mem memory.Allocator, f *footerBlock, i int) 
(dataBlock, error) {
+       var blk flatbuf.Block
+       if !f.data.Dictionaries(&blk, i) {
+               return fileBlock{}, fmt.Errorf("arrow/ipc: could not extract 
dictionary block %d", i)
+       }
+
+       return fileBlock{
+               offset: blk.Offset(),
+               meta:   blk.MetaDataLength(),
+               body:   blk.BodyLength(),
+               r:      r.r,
+               mem:    mem,
+       }, nil
+}
+
+type mappedReaderImpl struct {
+       data []byte
+}
+
+func (r *mappedReaderImpl) getBytes(offset, length int64) ([]byte, error) {
+       if offset < 0 || offset+int64(length) > int64(len(r.data)) {
+               return nil, fmt.Errorf("arrow/ipc: invalid offset=%d or 
length=%d", offset, length)
+       }
+
+       return r.data[offset : offset+length], nil
+}
+
+func (r *mappedReaderImpl) getFooterEnd() (int64, error) { return 
int64(len(r.data)), nil }
+
+func (r *mappedReaderImpl) block(_ memory.Allocator, f *footerBlock, i int) 
(dataBlock, error) {
+       var blk flatbuf.Block
+       if !f.data.RecordBatches(&blk, i) {
+               return mappedFileBlock{}, fmt.Errorf("arrow/ipc: could not 
extract file block %d", i)
+       }
+
+       return mappedFileBlock{
+               offset: blk.Offset(),
+               meta:   blk.MetaDataLength(),
+               body:   blk.BodyLength(),
+               data:   r.data,
+       }, nil
+}
+
+func (r *mappedReaderImpl) dict(_ memory.Allocator, f *footerBlock, i int) 
(dataBlock, error) {
+       var blk flatbuf.Block
+       if !f.data.Dictionaries(&blk, i) {
+               return mappedFileBlock{}, fmt.Errorf("arrow/ipc: could not 
extract dictionary block %d", i)
+       }
+
+       return mappedFileBlock{
+               offset: blk.Offset(),
+               meta:   blk.MetaDataLength(),
+               body:   blk.BodyLength(),
+               data:   r.data,
+       }, nil
+}
+
+// FileReader is an Arrow file reader.
+type FileReader struct {
+       r readerImpl
+
+       footer footerBlock
 
        // fields dictTypeMap
        memo dictutils.Memo
@@ -56,81 +168,71 @@ type FileReader struct {
        swapEndianness bool
 }
 
+// NewMappedFileReader is like NewFileReader but instead of using a 
ReadAtSeeker,
+// which will force copies through the Read/ReadAt methods, it uses a byte 
slice
+// and pulls slices directly from the data. This is useful specifically when
+// dealing with mmapped data so that you can lazily load the buffers and avoid
+// extraneous copies. The slices used for the record column buffers will simply
+// reference the existing data instead of performing copies via ReadAt/Read.
+//
+// For example, syscall.Mmap returns a byte slice which could be referencing
+// a shared memory region or otherwise a memory-mapped file.
+func NewMappedFileReader(data []byte, opts ...Option) (*FileReader, error) {
+       var (
+               cfg = newConfig(opts...)
+               f   = FileReader{
+                       r:   &mappedReaderImpl{data: data},
+                       mem: cfg.alloc,
+               }
+       )
+
+       if err := f.init(cfg); err != nil {
+               return nil, err
+       }
+       return &f, nil
+}
+
 // NewFileReader opens an Arrow file using the provided reader r.
 func NewFileReader(r ReadAtSeeker, opts ...Option) (*FileReader, error) {
        var (
                cfg = newConfig(opts...)
-               err error
-
-               f = FileReader{
-                       r:    r,
+               f   = FileReader{
+                       r:    &basicReaderImpl{r: r},
                        memo: dictutils.NewMemo(),
                        mem:  cfg.alloc,
                }
        )
 
+       if err := f.init(cfg); err != nil {
+               return nil, err
+       }
+       return &f, nil
+}
+
+func (f *FileReader) init(cfg *config) error {
+       var err error
        if cfg.footer.offset <= 0 {
-               cfg.footer.offset, err = f.r.Seek(0, io.SeekEnd)
+               cfg.footer.offset, err = f.r.getFooterEnd()
                if err != nil {
-                       return nil, fmt.Errorf("arrow/ipc: could retrieve 
footer offset: %w", err)
+                       return fmt.Errorf("arrow/ipc: could retrieve footer 
offset: %w", err)
                }
        }
        f.footer.offset = cfg.footer.offset
 
        err = f.readFooter()
        if err != nil {
-               return nil, fmt.Errorf("arrow/ipc: could not decode footer: 
%w", err)
+               return fmt.Errorf("arrow/ipc: could not decode footer: %w", err)
        }
 
        err = f.readSchema(cfg.ensureNativeEndian)
        if err != nil {
-               return nil, fmt.Errorf("arrow/ipc: could not decode schema: 
%w", err)
+               return fmt.Errorf("arrow/ipc: could not decode schema: %w", err)
        }
 
        if cfg.schema != nil && !cfg.schema.Equal(f.schema) {
-               return nil, fmt.Errorf("arrow/ipc: inconsistent schema for 
reading (got: %v, want: %v)", f.schema, cfg.schema)
-       }
-
-       return &f, err
-}
-
-func (f *FileReader) readFooter() error {
-       var err error
-
-       if f.footer.offset <= int64(len(Magic)*2+4) {
-               return fmt.Errorf("arrow/ipc: file too small (size=%d)", 
f.footer.offset)
-       }
-
-       eof := int64(len(Magic) + 4)
-       buf := make([]byte, eof)
-       n, err := f.r.ReadAt(buf, f.footer.offset-eof)
-       if err != nil {
-               return fmt.Errorf("arrow/ipc: could not read footer: %w", err)
-       }
-       if n != len(buf) {
-               return fmt.Errorf("arrow/ipc: could not read %d bytes from end 
of file", len(buf))
+               return fmt.Errorf("arrow/ipc: inconsistent schema for reading 
(got: %v, want: %v)", f.schema, cfg.schema)
        }
 
-       if !bytes.Equal(buf[4:], Magic) {
-               return errNotArrowFile
-       }
-
-       size := int64(binary.LittleEndian.Uint32(buf[:4]))
-       if size <= 0 || size+int64(len(Magic)*2+4) > f.footer.offset {
-               return errInconsistentFileMetadata
-       }
-
-       buf = make([]byte, size)
-       n, err = f.r.ReadAt(buf, f.footer.offset-size-eof)
-       if err != nil {
-               return fmt.Errorf("arrow/ipc: could not read footer data: %w", 
err)
-       }
-       if n != len(buf) {
-               return fmt.Errorf("arrow/ipc: could not read %d bytes from 
footer data", len(buf))
-       }
-
-       f.footer.buffer = memory.NewBufferBytes(buf)
-       f.footer.data = flatbuf.GetRootAsFooter(buf, 0)
        return err
 }
 
@@ -155,17 +257,17 @@ func (f *FileReader) readSchema(ensureNativeEndian bool) 
error {
        }
 
        for i := 0; i < f.NumDictionaries(); i++ {
-               blk, err := f.dict(i)
+               blk, err := f.r.dict(f.mem, &f.footer, i)
                if err != nil {
                        return fmt.Errorf("arrow/ipc: could not read 
dictionary[%d]: %w", i, err)
                }
                switch {
-               case !bitutil.IsMultipleOf8(blk.Offset):
-                       return fmt.Errorf("arrow/ipc: invalid file offset=%d 
for dictionary %d", blk.Offset, i)
-               case !bitutil.IsMultipleOf8(int64(blk.Meta)):
-                       return fmt.Errorf("arrow/ipc: invalid file metadata=%d 
position for dictionary %d", blk.Meta, i)
-               case !bitutil.IsMultipleOf8(blk.Body):
-                       return fmt.Errorf("arrow/ipc: invalid file body=%d 
position for dictionary %d", blk.Body, i)
+               case !bitutil.IsMultipleOf8(blk.Offset()):
+                       return fmt.Errorf("arrow/ipc: invalid file offset=%d 
for dictionary %d", blk.Offset(), i)
+               case !bitutil.IsMultipleOf8(int64(blk.Meta())):
+                       return fmt.Errorf("arrow/ipc: invalid file metadata=%d 
position for dictionary %d", blk.Meta(), i)
+               case !bitutil.IsMultipleOf8(blk.Body()):
+                       return fmt.Errorf("arrow/ipc: invalid file body=%d 
position for dictionary %d", blk.Body(), i)
                }
 
                msg, err := blk.NewMessage()
@@ -173,7 +275,7 @@ func (f *FileReader) readSchema(ensureNativeEndian bool) 
error {
                        return err
                }
 
-               kind, err = readDictionary(&f.memo, msg.meta, 
bytes.NewReader(msg.body.Bytes()), f.swapEndianness, f.mem)
+               kind, err = readDictionary(&f.memo, msg.meta, msg.body, 
f.swapEndianness, f.mem)
                if err != nil {
                        return err
                }
@@ -185,34 +287,34 @@ func (f *FileReader) readSchema(ensureNativeEndian bool) 
error {
        return err
 }
 
-func (f *FileReader) block(i int) (fileBlock, error) {
-       var blk flatbuf.Block
-       if !f.footer.data.RecordBatches(&blk, i) {
-               return fileBlock{}, fmt.Errorf("arrow/ipc: could not extract 
file block %d", i)
+func (f *FileReader) readFooter() error {
+       if f.footer.offset <= minimumOffsetSize {
+               return fmt.Errorf("arrow/ipc: file too small (size=%d)", 
f.footer.offset)
        }
 
-       return fileBlock{
-               Offset: blk.Offset(),
-               Meta:   blk.MetaDataLength(),
-               Body:   blk.BodyLength(),
-               r:      f.r,
-               mem:    f.mem,
-       }, nil
-}
+       eof := int64(len(Magic) + footerSizeLen)
+       buf, err := f.r.getBytes(f.footer.offset-eof, eof)
+       if err != nil {
+               return err
+       }
 
-func (f *FileReader) dict(i int) (fileBlock, error) {
-       var blk flatbuf.Block
-       if !f.footer.data.Dictionaries(&blk, i) {
-               return fileBlock{}, fmt.Errorf("arrow/ipc: could not extract 
dictionary block %d", i)
+       if !bytes.Equal(buf[4:], Magic) {
+               return errNotArrowFile
        }
 
-       return fileBlock{
-               Offset: blk.Offset(),
-               Meta:   blk.MetaDataLength(),
-               Body:   blk.BodyLength(),
-               r:      f.r,
-               mem:    f.mem,
-       }, nil
+       size := int64(binary.LittleEndian.Uint32(buf[:footerSizeLen]))
+       if size <= 0 || size+minimumOffsetSize > f.footer.offset {
+               return errInconsistentFileMetadata
+       }
+
+       buf, err = f.r.getBytes(f.footer.offset-size-eof, size)
+       if err != nil {
+               return err
+       }
+
+       f.footer.buffer = memory.NewBufferBytes(buf)
+       f.footer.data = flatbuf.GetRootAsFooter(buf, 0)
+       return nil
 }
 
 func (f *FileReader) Schema() *arrow.Schema {
@@ -278,17 +380,17 @@ func (f *FileReader) RecordAt(i int) (arrow.Record, 
error) {
                panic("arrow/ipc: record index out of bounds")
        }
 
-       blk, err := f.block(i)
+       blk, err := f.r.block(f.mem, &f.footer, i)
        if err != nil {
                return nil, err
        }
        switch {
-       case !bitutil.IsMultipleOf8(blk.Offset):
-               return nil, fmt.Errorf("arrow/ipc: invalid file offset=%d for 
record %d", blk.Offset, i)
-       case !bitutil.IsMultipleOf8(int64(blk.Meta)):
-               return nil, fmt.Errorf("arrow/ipc: invalid file metadata=%d 
position for record %d", blk.Meta, i)
-       case !bitutil.IsMultipleOf8(blk.Body):
-               return nil, fmt.Errorf("arrow/ipc: invalid file body=%d 
position for record %d", blk.Body, i)
+       case !bitutil.IsMultipleOf8(blk.Offset()):
+               return nil, fmt.Errorf("arrow/ipc: invalid file offset=%d for 
record %d", blk.Offset(), i)
+       case !bitutil.IsMultipleOf8(int64(blk.Meta())):
+               return nil, fmt.Errorf("arrow/ipc: invalid file metadata=%d 
position for record %d", blk.Meta(), i)
+       case !bitutil.IsMultipleOf8(blk.Body()):
+               return nil, fmt.Errorf("arrow/ipc: invalid file body=%d 
position for record %d", blk.Body(), i)
        }
 
        msg, err := blk.NewMessage()
@@ -301,7 +403,7 @@ func (f *FileReader) RecordAt(i int) (arrow.Record, error) {
                return nil, fmt.Errorf("arrow/ipc: message %d is not a Record", 
i)
        }
 
-       return newRecord(f.schema, &f.memo, msg.meta, 
bytes.NewReader(msg.body.Bytes()), f.swapEndianness, f.mem), nil
+       return newRecord(f.schema, &f.memo, msg.meta, msg.body, 
f.swapEndianness, f.mem), nil
 }
 
 // Read reads the current record from the underlying stream and an error, if 
any.
@@ -323,7 +425,7 @@ func (f *FileReader) ReadAt(i int64) (arrow.Record, error) {
        return f.Record(int(i))
 }
 
-func newRecord(schema *arrow.Schema, memo *dictutils.Memo, meta 
*memory.Buffer, body ReadAtSeeker, swapEndianness bool, mem memory.Allocator) 
arrow.Record {
+func newRecord(schema *arrow.Schema, memo *dictutils.Memo, meta 
*memory.Buffer, body *memory.Buffer, swapEndianness bool, mem memory.Allocator) 
arrow.Record {
        var (
                msg   = flatbuf.GetRootAsMessage(meta.Bytes(), 0)
                md    flatbuf.RecordBatch
@@ -340,10 +442,10 @@ func newRecord(schema *arrow.Schema, memo 
*dictutils.Memo, meta *memory.Buffer,
 
        ctx := &arrayLoaderContext{
                src: ipcSource{
-                       meta:  &md,
-                       r:     body,
-                       codec: codec,
-                       mem:   mem,
+                       meta:     &md,
+                       rawBytes: body,
+                       codec:    codec,
+                       mem:      mem,
                },
                memo:    memo,
                max:     kMaxNestingDepth,
@@ -372,10 +474,10 @@ func newRecord(schema *arrow.Schema, memo 
*dictutils.Memo, meta *memory.Buffer,
 }
 
 type ipcSource struct {
-       meta  *flatbuf.RecordBatch
-       r     ReadAtSeeker
-       codec decompressor
-       mem   memory.Allocator
+       meta     *flatbuf.RecordBatch
+       rawBytes *memory.Buffer
+       codec    decompressor
+       mem      memory.Allocator
 }
 
 func (src *ipcSource) buffer(i int) *memory.Buffer {
@@ -388,34 +490,23 @@ func (src *ipcSource) buffer(i int) *memory.Buffer {
                return memory.NewBufferBytes(nil)
        }
 
-       raw := memory.NewResizableBuffer(src.mem)
+       var raw *memory.Buffer
        if src.codec == nil {
-               raw.Resize(int(buf.Length()))
-               _, err := src.r.ReadAt(raw.Bytes(), buf.Offset())
-               if err != nil {
-                       panic(err)
-               }
+               raw = memory.SliceBuffer(src.rawBytes, int(buf.Offset()), 
int(buf.Length()))
        } else {
-               sr := io.NewSectionReader(src.r, buf.Offset(), buf.Length())
-               var uncompressedSize uint64
+               body := src.rawBytes.Bytes()[buf.Offset() : 
buf.Offset()+buf.Length()]
+               uncompressedSize := int64(binary.LittleEndian.Uint64(body[:8]))
 
-               err := binary.Read(sr, binary.LittleEndian, &uncompressedSize)
-               if err != nil {
-                       panic(err)
-               }
-
-               var r io.Reader = sr
                // check for an uncompressed buffer
-               if int64(uncompressedSize) != -1 {
+               if uncompressedSize != -1 {
+                       raw = memory.NewResizableBuffer(src.mem)
                        raw.Resize(int(uncompressedSize))
-                       src.codec.Reset(sr)
-                       r = src.codec
+                       src.codec.Reset(bytes.NewReader(body[8:]))
+                       if _, err := io.ReadFull(src.codec, raw.Bytes()); err 
!= nil {
+                               panic(err)
+                       }
                } else {
-                       raw.Resize(int(buf.Length() - 8))
-               }
-
-               if _, err = io.ReadFull(r, raw.Bytes()); err != nil {
-                       panic(err)
+                       raw = memory.SliceBuffer(src.rawBytes, 
int(buf.Offset())+8, int(buf.Length())-8)
                }
        }
 
@@ -717,7 +808,7 @@ func (ctx *arrayLoaderContext) loadUnion(dt 
arrow.UnionType) arrow.ArrayData {
        return array.NewData(dt, int(field.Length()), buffers, subs, 0, 0)
 }
 
-func readDictionary(memo *dictutils.Memo, meta *memory.Buffer, body 
ReadAtSeeker, swapEndianness bool, mem memory.Allocator) (dictutils.Kind, 
error) {
+func readDictionary(memo *dictutils.Memo, meta *memory.Buffer, body 
*memory.Buffer, swapEndianness bool, mem memory.Allocator) (dictutils.Kind, 
error) {
        var (
                msg   = flatbuf.GetRootAsMessage(meta.Bytes(), 0)
                md    flatbuf.DictionaryBatch
@@ -743,10 +834,10 @@ func readDictionary(memo *dictutils.Memo, meta 
*memory.Buffer, body ReadAtSeeker
 
        ctx := &arrayLoaderContext{
                src: ipcSource{
-                       meta:  &data,
-                       codec: codec,
-                       r:     body,
-                       mem:   mem,
+                       meta:     &data,
+                       codec:    codec,
+                       rawBytes: body,
+                       mem:      mem,
                },
                memo: memo,
                max:  kMaxNestingDepth,
@@ -768,3 +859,44 @@ func readDictionary(memo *dictutils.Memo, meta 
*memory.Buffer, body ReadAtSeeker
        }
        return dictutils.KindReplacement, nil
 }
+
+type mappedFileBlock struct {
+       offset int64
+       meta   int32
+       body   int64
+
+       data []byte
+}
+
+func (blk mappedFileBlock) Offset() int64 { return blk.offset }
+func (blk mappedFileBlock) Meta() int32   { return blk.meta }
+func (blk mappedFileBlock) Body() int64   { return blk.body }
+
+func (blk mappedFileBlock) section() []byte {
+       return blk.data[blk.offset : blk.offset+int64(blk.meta)+blk.body]
+}
+
+func (blk mappedFileBlock) NewMessage() (*Message, error) {
+       var (
+               body *memory.Buffer
+               meta *memory.Buffer
+               buf  = blk.section()
+       )
+
+       metaBytes := buf[:blk.meta]
+
+       prefix := 0
+       switch binary.LittleEndian.Uint32(metaBytes) {
+       case 0:
+       case kIPCContToken:
+               prefix = 8
+       default:
+               // ARROW-6314: backwards compatibility for reading old IPC
+               // messages produced prior to version 0.15.0
+               prefix = 4
+       }
+
+       meta = memory.NewBufferBytes(metaBytes[prefix:])
+       body = memory.NewBufferBytes(buf[blk.meta : int64(blk.meta)+blk.body])
+       return NewMessage(meta, body), nil
+}
diff --git a/arrow/ipc/file_writer.go b/arrow/ipc/file_writer.go
index c6aefd5..b57cafb 100644
--- a/arrow/ipc/file_writer.go
+++ b/arrow/ipc/file_writer.go
@@ -41,8 +41,8 @@ type fileWriter struct {
        streamWriter
 
        schema *arrow.Schema
-       dicts  []fileBlock
-       recs   []fileBlock
+       dicts  []dataBlock
+       recs   []dataBlock
 }
 
 func (w *fileWriter) Start() error {
@@ -63,13 +63,13 @@ func (w *fileWriter) Start() error {
 }
 
 func (w *fileWriter) WritePayload(p Payload) error {
-       blk := fileBlock{Offset: w.pos, Meta: 0, Body: p.size}
+       blk := fileBlock{offset: w.pos, meta: 0, body: p.size}
        n, err := writeIPCPayload(w, p)
        if err != nil {
                return err
        }
 
-       blk.Meta = int32(n)
+       blk.meta = int32(n)
 
        switch flatbuf.MessageHeader(p.msg) {
        case flatbuf.MessageHeaderDictionaryBatch:
diff --git a/arrow/ipc/metadata.go b/arrow/ipc/metadata.go
index a5bf187..b83c1a8 100644
--- a/arrow/ipc/metadata.go
+++ b/arrow/ipc/metadata.go
@@ -63,19 +63,23 @@ type bufferMetadata struct {
 }
 
 type fileBlock struct {
-       Offset int64
-       Meta   int32
-       Body   int64
+       offset int64
+       meta   int32
+       body   int64
 
        r   io.ReaderAt
        mem memory.Allocator
 }
 
-func fileBlocksToFB(b *flatbuffers.Builder, blocks []fileBlock, start 
startVecFunc) flatbuffers.UOffsetT {
+func (blk fileBlock) Offset() int64 { return blk.offset }
+func (blk fileBlock) Meta() int32   { return blk.meta }
+func (blk fileBlock) Body() int64   { return blk.body }
+
+func fileBlocksToFB(b *flatbuffers.Builder, blocks []dataBlock, start 
startVecFunc) flatbuffers.UOffsetT {
        start(b, len(blocks))
        for i := len(blocks) - 1; i >= 0; i-- {
                blk := blocks[i]
-               flatbuf.CreateBlock(b, blk.Offset, blk.Meta, blk.Body)
+               flatbuf.CreateBlock(b, blk.Offset(), blk.Meta(), blk.Body())
        }
 
        return b.EndVector(len(blocks))
@@ -91,7 +95,7 @@ func (blk fileBlock) NewMessage() (*Message, error) {
        )
 
        meta = memory.NewResizableBuffer(blk.mem)
-       meta.Resize(int(blk.Meta))
+       meta.Resize(int(blk.meta))
        defer meta.Release()
 
        buf = meta.Bytes()
@@ -112,12 +116,12 @@ func (blk fileBlock) NewMessage() (*Message, error) {
        }
 
        // drop buf-size already known from blk.Meta
-       meta = memory.SliceBuffer(meta, prefix, int(blk.Meta)-prefix)
+       meta = memory.SliceBuffer(meta, prefix, int(blk.meta)-prefix)
        defer meta.Release()
 
        body = memory.NewResizableBuffer(blk.mem)
        defer body.Release()
-       body.Resize(int(blk.Body))
+       body.Resize(int(blk.body))
        buf = body.Bytes()
        _, err = io.ReadFull(r, buf)
        if err != nil {
@@ -128,7 +132,7 @@ func (blk fileBlock) NewMessage() (*Message, error) {
 }
 
 func (blk fileBlock) section() io.Reader {
-       return io.NewSectionReader(blk.r, blk.Offset, int64(blk.Meta)+blk.Body)
+       return io.NewSectionReader(blk.r, blk.offset, int64(blk.meta)+blk.body)
 }
 
 func unitFromFB(unit flatbuf.TimeUnit) arrow.TimeUnit {
@@ -1156,7 +1160,7 @@ func writeSchemaMessage(schema *arrow.Schema, mem 
memory.Allocator, dict *dictut
        return writeMessageFB(b, mem, flatbuf.MessageHeaderSchema, schemaFB, 0)
 }
 
-func writeFileFooter(schema *arrow.Schema, dicts, recs []fileBlock, w 
io.Writer) error {
+func writeFileFooter(schema *arrow.Schema, dicts, recs []dataBlock, w 
io.Writer) error {
        var (
                b    = flatbuffers.NewBuilder(1024)
                memo dictutils.Mapper
diff --git a/arrow/ipc/metadata_test.go b/arrow/ipc/metadata_test.go
index 29b9b7f..0acce42 100644
--- a/arrow/ipc/metadata_test.go
+++ b/arrow/ipc/metadata_test.go
@@ -89,8 +89,8 @@ func TestRWSchema(t *testing.T) {
 func TestRWFooter(t *testing.T) {
        for _, tc := range []struct {
                schema *arrow.Schema
-               dicts  []fileBlock
-               recs   []fileBlock
+               dicts  []dataBlock
+               recs   []dataBlock
        }{
                {
                        schema: arrow.NewSchema([]arrow.Field{
@@ -98,15 +98,15 @@ func TestRWFooter(t *testing.T) {
                                {Name: "f2", Type: arrow.PrimitiveTypes.Uint16},
                                {Name: "f3", Type: 
arrow.PrimitiveTypes.Float64},
                        }, nil),
-                       dicts: []fileBlock{
-                               {Offset: 1, Meta: 2, Body: 3},
-                               {Offset: 4, Meta: 5, Body: 6},
-                               {Offset: 7, Meta: 8, Body: 9},
+                       dicts: []dataBlock{
+                               fileBlock{offset: 1, meta: 2, body: 3},
+                               fileBlock{offset: 4, meta: 5, body: 6},
+                               fileBlock{offset: 7, meta: 8, body: 9},
                        },
-                       recs: []fileBlock{
-                               {Offset: 0, Meta: 10, Body: 30},
-                               {Offset: 10, Meta: 30, Body: 60},
-                               {Offset: 20, Meta: 30, Body: 40},
+                       recs: []dataBlock{
+                               fileBlock{offset: 0, meta: 10, body: 30},
+                               fileBlock{offset: 10, meta: 30, body: 60},
+                               fileBlock{offset: 20, meta: 30, body: 40},
                        },
                },
        } {
@@ -142,7 +142,7 @@ func TestRWFooter(t *testing.T) {
                                if !footer.Dictionaries(&blk, i) {
                                        t.Fatalf("could not get dictionary %d", 
i)
                                }
-                               got := fileBlock{Offset: blk.Offset(), Meta: 
blk.MetaDataLength(), Body: blk.BodyLength()}
+                               got := fileBlock{offset: blk.Offset(), meta: 
blk.MetaDataLength(), body: blk.BodyLength()}
                                want := dict
                                if got != want {
                                        t.Errorf("dict[%d] differ:\ngot= 
%v\nwant=%v", i, got, want)
@@ -158,7 +158,7 @@ func TestRWFooter(t *testing.T) {
                                if !footer.RecordBatches(&blk, i) {
                                        t.Fatalf("could not get record %d", i)
                                }
-                               got := fileBlock{Offset: blk.Offset(), Meta: 
blk.MetaDataLength(), Body: blk.BodyLength()}
+                               got := fileBlock{offset: blk.Offset(), meta: 
blk.MetaDataLength(), body: blk.BodyLength()}
                                want := rec
                                if got != want {
                                        t.Errorf("record[%d] differ:\ngot= 
%v\nwant=%v", i, got, want)
diff --git a/arrow/ipc/reader.go b/arrow/ipc/reader.go
index f74fddd..2a4f859 100644
--- a/arrow/ipc/reader.go
+++ b/arrow/ipc/reader.go
@@ -17,7 +17,6 @@
 package ipc
 
 import (
-       "bytes"
        "errors"
        "fmt"
        "io"
@@ -201,7 +200,7 @@ func (r *Reader) getInitialDicts() bool {
                if msg.Type() != MessageDictionaryBatch {
                        r.err = fmt.Errorf("arrow/ipc: IPC stream did not have 
the expected (%d) dictionaries at the start of the stream", numDicts)
                }
-               if _, err := readDictionary(&r.memo, msg.meta, 
bytes.NewReader(msg.body.Bytes()), r.swapEndianness, r.mem); err != nil {
+               if _, err := readDictionary(&r.memo, msg.meta, msg.body, 
r.swapEndianness, r.mem); err != nil {
                        r.done = true
                        r.err = err
                        return false
@@ -233,7 +232,7 @@ func (r *Reader) next() bool {
        msg, r.err = r.r.Message()
 
        for msg != nil && msg.Type() == MessageDictionaryBatch {
-               if _, r.err = readDictionary(&r.memo, msg.meta, 
bytes.NewReader(msg.body.Bytes()), r.swapEndianness, r.mem); r.err != nil {
+               if _, r.err = readDictionary(&r.memo, msg.meta, msg.body, 
r.swapEndianness, r.mem); r.err != nil {
                        r.done = true
                        return false
                }
@@ -252,7 +251,7 @@ func (r *Reader) next() bool {
                return false
        }
 
-       r.rec = newRecord(r.schema, &r.memo, msg.meta, 
bytes.NewReader(msg.body.Bytes()), r.swapEndianness, r.mem)
+       r.rec = newRecord(r.schema, &r.memo, msg.meta, msg.body, 
r.swapEndianness, r.mem)
        return true
 }
 
diff --git a/arrow/ipc/reader_test.go b/arrow/ipc/reader_test.go
index 792519b..a78bb75 100644
--- a/arrow/ipc/reader_test.go
+++ b/arrow/ipc/reader_test.go
@@ -21,6 +21,7 @@ import (
        "fmt"
        "io"
        "testing"
+       "unsafe"
 
        "github.com/apache/arrow-go/v18/arrow"
        "github.com/apache/arrow-go/v18/arrow/array"
@@ -96,6 +97,79 @@ func TestReaderCheckedAllocator(t *testing.T) {
        require.NoError(t, err)
 }
 
+func TestMappedReader(t *testing.T) {
+       pool := memory.NewCheckedAllocator(memory.NewGoAllocator())
+       defer pool.AssertSize(t, 0)
+       schema := arrow.NewSchema([]arrow.Field{{Name: "f1", Type: 
arrow.PrimitiveTypes.Int32}}, nil)
+       b := array.NewRecordBuilder(pool, schema)
+       defer b.Release()
+       b.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3, 4}, 
[]bool{true, true, false, true})
+
+       rec1 := b.NewRecord()
+       defer rec1.Release()
+
+       tbl := array.NewTableFromRecords(schema, []arrow.Record{rec1})
+       defer tbl.Release()
+
+       var buf bytes.Buffer
+       ipcWriter, err := NewFileWriter(&buf, WithAllocator(pool), 
WithSchema(schema))
+       require.NoError(t, err)
+
+       t.Log("Reading data before")
+       tr := array.NewTableReader(tbl, 2)
+       defer tr.Release()
+
+       n := 0
+       for tr.Next() {
+               rec := tr.Record()
+               for i, col := range rec.Columns() {
+                       t.Logf("rec[%d][%q]: %v nulls:%v\n", n,
+                               rec.ColumnName(i), col, col.NullBitmapBytes())
+               }
+               n++
+               err := ipcWriter.Write(rec)
+               if err != nil {
+                       panic(err)
+               }
+       }
+       require.NoError(t, ipcWriter.Close())
+
+       t.Log("Reading data after")
+       rdr, err := NewMappedFileReader(buf.Bytes(), WithAllocator(pool))
+       require.NoError(t, err)
+       defer rdr.Close()
+
+       rec, err := rdr.RecordAt(0)
+       require.NoError(t, err)
+       defer rec.Release()
+
+       // get offset and block info into the buffer bytes
+       blk, err := rdr.r.block(nil, &rdr.footer, 0)
+       require.NoError(t, err)
+
+       // determine pointer location of bytes for the first buffer
+       // no nulls, so only one buffer
+       start := unsafe.Pointer(unsafe.SliceData(buf.Bytes()))
+       loc := unsafe.Add(unsafe.Add(start, blk.Offset()), blk.Meta())
+       // ensure our buffer pointer matches the calculated pointer
+       assert.Equal(t, (*byte)(loc), 
unsafe.SliceData(rec.Column(0).Data().Buffers()[1].Bytes()))
+
+       rec, err = rdr.RecordAt(1)
+       require.NoError(t, err)
+       defer rec.Release()
+
+       blk, err = rdr.r.block(nil, &rdr.footer, 1)
+       require.NoError(t, err)
+
+       start = unsafe.Pointer(unsafe.SliceData(buf.Bytes()))
+       loc = unsafe.Add(unsafe.Add(start, blk.Offset()), blk.Meta())
+       // check pointer of validity bitmap location
+       assert.Equal(t, (*byte)(loc), 
unsafe.SliceData(rec.Column(0).Data().Buffers()[0].Bytes()))
+       // calculate and check pointer of data buffer
+       loc = unsafe.Add(loc, rec.Column(0).Data().Buffers()[0].Len())
+       assert.Equal(t, (*byte)(loc), 
unsafe.SliceData(rec.Column(0).Data().Buffers()[1].Bytes()))
+}
+
 func BenchmarkIPC(b *testing.B) {
        alloc := memory.NewCheckedAllocator(memory.NewGoAllocator())
        defer alloc.AssertSize(b, 0)

Reply via email to