This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 95c4e4d feat(arrow/ipc): implement lazy loading/zero-copy for IPC
files (#216)
95c4e4d is described below
commit 95c4e4d1297b8413cbf208a435088022586cdfc9
Author: Matt Topol <[email protected]>
AuthorDate: Wed Dec 18 14:57:45 2024 -0500
feat(arrow/ipc): implement lazy loading/zero-copy for IPC files (#216)
### Rationale for this change
closes #207
### What changes are included in this PR?
Adding new method `NewMappedFileReader` to ipc package which accepts a
byte slice instead of a `ReaderAtSeeker`. Updates `ipcSource` to
reference the raw byte slices from the input directly instead of
wrapping with `bytes.NewReader` which forces copies via `Read`,
`ReadFull`, etc.
### Are these changes tested?
Unit tests added to confirm that the pointers match and that we aren't
allocating unnecessarily.
### Are there any user-facing changes?
Shouldn't be any user-facing changes other than a reduction in memory
usage when reading non-compressed IPC data.
---
arrow/ipc/file_reader.go | 392 ++++++++++++++++++++++++++++++---------------
arrow/ipc/file_writer.go | 8 +-
arrow/ipc/metadata.go | 24 +--
arrow/ipc/metadata_test.go | 24 +--
arrow/ipc/reader.go | 7 +-
arrow/ipc/reader_test.go | 74 +++++++++
6 files changed, 369 insertions(+), 160 deletions(-)
diff --git a/arrow/ipc/file_reader.go b/arrow/ipc/file_reader.go
index 2715831..9135529 100644
--- a/arrow/ipc/file_reader.go
+++ b/arrow/ipc/file_reader.go
@@ -33,15 +33,127 @@ import (
"github.com/apache/arrow-go/v18/arrow/memory"
)
-// FileReader is an Arrow file reader.
-type FileReader struct {
+type readerImpl interface {
+ getFooterEnd() (int64, error)
+ getBytes(offset, length int64) ([]byte, error)
+ dict(memory.Allocator, *footerBlock, int) (dataBlock, error)
+ block(memory.Allocator, *footerBlock, int) (dataBlock, error)
+}
+
+type footerBlock struct {
+ offset int64
+ buffer *memory.Buffer
+ data *flatbuf.Footer
+}
+
+type dataBlock interface {
+ Offset() int64
+ Meta() int32
+ Body() int64
+ NewMessage() (*Message, error)
+}
+
+const footerSizeLen = 4
+
+var minimumOffsetSize = int64(len(Magic)*2 + footerSizeLen)
+
+type basicReaderImpl struct {
r ReadAtSeeker
+}
- footer struct {
- offset int64
- buffer *memory.Buffer
- data *flatbuf.Footer
+func (r *basicReaderImpl) getBytes(offset, len int64) ([]byte, error) {
+ buf := make([]byte, len)
+ n, err := r.r.ReadAt(buf, offset)
+ if err != nil {
+ return nil, fmt.Errorf("arrow/ipc: could not read %d bytes at
offset %d: %w", len, offset, err)
+ }
+ if int64(n) != len {
+ return nil, fmt.Errorf("arrow/ipc: could not read %d bytes at
offset %d", len, offset)
}
+ return buf, nil
+}
+
+func (r *basicReaderImpl) getFooterEnd() (int64, error) {
+ return r.r.Seek(0, io.SeekEnd)
+}
+
+func (r *basicReaderImpl) block(mem memory.Allocator, f *footerBlock, i int)
(dataBlock, error) {
+ var blk flatbuf.Block
+ if !f.data.RecordBatches(&blk, i) {
+ return fileBlock{}, fmt.Errorf("arrow/ipc: could not extract
file block %d", i)
+ }
+
+ return fileBlock{
+ offset: blk.Offset(),
+ meta: blk.MetaDataLength(),
+ body: blk.BodyLength(),
+ r: r.r,
+ mem: mem,
+ }, nil
+}
+
+func (r *basicReaderImpl) dict(mem memory.Allocator, f *footerBlock, i int)
(dataBlock, error) {
+ var blk flatbuf.Block
+ if !f.data.Dictionaries(&blk, i) {
+ return fileBlock{}, fmt.Errorf("arrow/ipc: could not extract
dictionary block %d", i)
+ }
+
+ return fileBlock{
+ offset: blk.Offset(),
+ meta: blk.MetaDataLength(),
+ body: blk.BodyLength(),
+ r: r.r,
+ mem: mem,
+ }, nil
+}
+
+type mappedReaderImpl struct {
+ data []byte
+}
+
+func (r *mappedReaderImpl) getBytes(offset, length int64) ([]byte, error) {
+ if offset < 0 || offset+int64(length) > int64(len(r.data)) {
+ return nil, fmt.Errorf("arrow/ipc: invalid offset=%d or
length=%d", offset, length)
+ }
+
+ return r.data[offset : offset+length], nil
+}
+
+func (r *mappedReaderImpl) getFooterEnd() (int64, error) { return
int64(len(r.data)), nil }
+
+func (r *mappedReaderImpl) block(_ memory.Allocator, f *footerBlock, i int)
(dataBlock, error) {
+ var blk flatbuf.Block
+ if !f.data.RecordBatches(&blk, i) {
+ return mappedFileBlock{}, fmt.Errorf("arrow/ipc: could not
extract file block %d", i)
+ }
+
+ return mappedFileBlock{
+ offset: blk.Offset(),
+ meta: blk.MetaDataLength(),
+ body: blk.BodyLength(),
+ data: r.data,
+ }, nil
+}
+
+func (r *mappedReaderImpl) dict(_ memory.Allocator, f *footerBlock, i int)
(dataBlock, error) {
+ var blk flatbuf.Block
+ if !f.data.Dictionaries(&blk, i) {
+ return mappedFileBlock{}, fmt.Errorf("arrow/ipc: could not
extract dictionary block %d", i)
+ }
+
+ return mappedFileBlock{
+ offset: blk.Offset(),
+ meta: blk.MetaDataLength(),
+ body: blk.BodyLength(),
+ data: r.data,
+ }, nil
+}
+
+// FileReader is an Arrow file reader.
+type FileReader struct {
+ r readerImpl
+
+ footer footerBlock
// fields dictTypeMap
memo dictutils.Memo
@@ -56,81 +168,71 @@ type FileReader struct {
swapEndianness bool
}
+// NewMappedFileReader is like NewFileReader but instead of using a
ReadAtSeeker,
+// which will force copies through the Read/ReadAt methods, it uses a byte
slice
+// and pulls slices directly from the data. This is useful specifically when
+// dealing with mmapped data so that you can lazily load the buffers and avoid
+// extraneous copies. The slices used for the record column buffers will simply
+// reference the existing data instead of performing copies via ReadAt/Read.
+//
+// For example, syscall.Mmap returns a byte slice which could be referencing
+// a shared memory region or otherwise a memory-mapped file.
+func NewMappedFileReader(data []byte, opts ...Option) (*FileReader, error) {
+ var (
+ cfg = newConfig(opts...)
+ f = FileReader{
+ r: &mappedReaderImpl{data: data},
+ mem: cfg.alloc,
+ }
+ )
+
+ if err := f.init(cfg); err != nil {
+ return nil, err
+ }
+ return &f, nil
+}
+
// NewFileReader opens an Arrow file using the provided reader r.
func NewFileReader(r ReadAtSeeker, opts ...Option) (*FileReader, error) {
var (
cfg = newConfig(opts...)
- err error
-
- f = FileReader{
- r: r,
+ f = FileReader{
+ r: &basicReaderImpl{r: r},
memo: dictutils.NewMemo(),
mem: cfg.alloc,
}
)
+ if err := f.init(cfg); err != nil {
+ return nil, err
+ }
+ return &f, nil
+}
+
+func (f *FileReader) init(cfg *config) error {
+ var err error
if cfg.footer.offset <= 0 {
- cfg.footer.offset, err = f.r.Seek(0, io.SeekEnd)
+ cfg.footer.offset, err = f.r.getFooterEnd()
if err != nil {
- return nil, fmt.Errorf("arrow/ipc: could retrieve
footer offset: %w", err)
+ return fmt.Errorf("arrow/ipc: could retrieve footer
offset: %w", err)
}
}
f.footer.offset = cfg.footer.offset
err = f.readFooter()
if err != nil {
- return nil, fmt.Errorf("arrow/ipc: could not decode footer:
%w", err)
+ return fmt.Errorf("arrow/ipc: could not decode footer: %w", err)
}
err = f.readSchema(cfg.ensureNativeEndian)
if err != nil {
- return nil, fmt.Errorf("arrow/ipc: could not decode schema:
%w", err)
+ return fmt.Errorf("arrow/ipc: could not decode schema: %w", err)
}
if cfg.schema != nil && !cfg.schema.Equal(f.schema) {
- return nil, fmt.Errorf("arrow/ipc: inconsistent schema for
reading (got: %v, want: %v)", f.schema, cfg.schema)
- }
-
- return &f, err
-}
-
-func (f *FileReader) readFooter() error {
- var err error
-
- if f.footer.offset <= int64(len(Magic)*2+4) {
- return fmt.Errorf("arrow/ipc: file too small (size=%d)",
f.footer.offset)
- }
-
- eof := int64(len(Magic) + 4)
- buf := make([]byte, eof)
- n, err := f.r.ReadAt(buf, f.footer.offset-eof)
- if err != nil {
- return fmt.Errorf("arrow/ipc: could not read footer: %w", err)
- }
- if n != len(buf) {
- return fmt.Errorf("arrow/ipc: could not read %d bytes from end
of file", len(buf))
+ return fmt.Errorf("arrow/ipc: inconsistent schema for reading
(got: %v, want: %v)", f.schema, cfg.schema)
}
- if !bytes.Equal(buf[4:], Magic) {
- return errNotArrowFile
- }
-
- size := int64(binary.LittleEndian.Uint32(buf[:4]))
- if size <= 0 || size+int64(len(Magic)*2+4) > f.footer.offset {
- return errInconsistentFileMetadata
- }
-
- buf = make([]byte, size)
- n, err = f.r.ReadAt(buf, f.footer.offset-size-eof)
- if err != nil {
- return fmt.Errorf("arrow/ipc: could not read footer data: %w",
err)
- }
- if n != len(buf) {
- return fmt.Errorf("arrow/ipc: could not read %d bytes from
footer data", len(buf))
- }
-
- f.footer.buffer = memory.NewBufferBytes(buf)
- f.footer.data = flatbuf.GetRootAsFooter(buf, 0)
return err
}
@@ -155,17 +257,17 @@ func (f *FileReader) readSchema(ensureNativeEndian bool)
error {
}
for i := 0; i < f.NumDictionaries(); i++ {
- blk, err := f.dict(i)
+ blk, err := f.r.dict(f.mem, &f.footer, i)
if err != nil {
return fmt.Errorf("arrow/ipc: could not read
dictionary[%d]: %w", i, err)
}
switch {
- case !bitutil.IsMultipleOf8(blk.Offset):
- return fmt.Errorf("arrow/ipc: invalid file offset=%d
for dictionary %d", blk.Offset, i)
- case !bitutil.IsMultipleOf8(int64(blk.Meta)):
- return fmt.Errorf("arrow/ipc: invalid file metadata=%d
position for dictionary %d", blk.Meta, i)
- case !bitutil.IsMultipleOf8(blk.Body):
- return fmt.Errorf("arrow/ipc: invalid file body=%d
position for dictionary %d", blk.Body, i)
+ case !bitutil.IsMultipleOf8(blk.Offset()):
+ return fmt.Errorf("arrow/ipc: invalid file offset=%d
for dictionary %d", blk.Offset(), i)
+ case !bitutil.IsMultipleOf8(int64(blk.Meta())):
+ return fmt.Errorf("arrow/ipc: invalid file metadata=%d
position for dictionary %d", blk.Meta(), i)
+ case !bitutil.IsMultipleOf8(blk.Body()):
+ return fmt.Errorf("arrow/ipc: invalid file body=%d
position for dictionary %d", blk.Body(), i)
}
msg, err := blk.NewMessage()
@@ -173,7 +275,7 @@ func (f *FileReader) readSchema(ensureNativeEndian bool)
error {
return err
}
- kind, err = readDictionary(&f.memo, msg.meta,
bytes.NewReader(msg.body.Bytes()), f.swapEndianness, f.mem)
+ kind, err = readDictionary(&f.memo, msg.meta, msg.body,
f.swapEndianness, f.mem)
if err != nil {
return err
}
@@ -185,34 +287,34 @@ func (f *FileReader) readSchema(ensureNativeEndian bool)
error {
return err
}
-func (f *FileReader) block(i int) (fileBlock, error) {
- var blk flatbuf.Block
- if !f.footer.data.RecordBatches(&blk, i) {
- return fileBlock{}, fmt.Errorf("arrow/ipc: could not extract
file block %d", i)
+func (f *FileReader) readFooter() error {
+ if f.footer.offset <= minimumOffsetSize {
+ return fmt.Errorf("arrow/ipc: file too small (size=%d)",
f.footer.offset)
}
- return fileBlock{
- Offset: blk.Offset(),
- Meta: blk.MetaDataLength(),
- Body: blk.BodyLength(),
- r: f.r,
- mem: f.mem,
- }, nil
-}
+ eof := int64(len(Magic) + footerSizeLen)
+ buf, err := f.r.getBytes(f.footer.offset-eof, eof)
+ if err != nil {
+ return err
+ }
-func (f *FileReader) dict(i int) (fileBlock, error) {
- var blk flatbuf.Block
- if !f.footer.data.Dictionaries(&blk, i) {
- return fileBlock{}, fmt.Errorf("arrow/ipc: could not extract
dictionary block %d", i)
+ if !bytes.Equal(buf[4:], Magic) {
+ return errNotArrowFile
}
- return fileBlock{
- Offset: blk.Offset(),
- Meta: blk.MetaDataLength(),
- Body: blk.BodyLength(),
- r: f.r,
- mem: f.mem,
- }, nil
+ size := int64(binary.LittleEndian.Uint32(buf[:footerSizeLen]))
+ if size <= 0 || size+minimumOffsetSize > f.footer.offset {
+ return errInconsistentFileMetadata
+ }
+
+ buf, err = f.r.getBytes(f.footer.offset-size-eof, size)
+ if err != nil {
+ return err
+ }
+
+ f.footer.buffer = memory.NewBufferBytes(buf)
+ f.footer.data = flatbuf.GetRootAsFooter(buf, 0)
+ return nil
}
func (f *FileReader) Schema() *arrow.Schema {
@@ -278,17 +380,17 @@ func (f *FileReader) RecordAt(i int) (arrow.Record,
error) {
panic("arrow/ipc: record index out of bounds")
}
- blk, err := f.block(i)
+ blk, err := f.r.block(f.mem, &f.footer, i)
if err != nil {
return nil, err
}
switch {
- case !bitutil.IsMultipleOf8(blk.Offset):
- return nil, fmt.Errorf("arrow/ipc: invalid file offset=%d for
record %d", blk.Offset, i)
- case !bitutil.IsMultipleOf8(int64(blk.Meta)):
- return nil, fmt.Errorf("arrow/ipc: invalid file metadata=%d
position for record %d", blk.Meta, i)
- case !bitutil.IsMultipleOf8(blk.Body):
- return nil, fmt.Errorf("arrow/ipc: invalid file body=%d
position for record %d", blk.Body, i)
+ case !bitutil.IsMultipleOf8(blk.Offset()):
+ return nil, fmt.Errorf("arrow/ipc: invalid file offset=%d for
record %d", blk.Offset(), i)
+ case !bitutil.IsMultipleOf8(int64(blk.Meta())):
+ return nil, fmt.Errorf("arrow/ipc: invalid file metadata=%d
position for record %d", blk.Meta(), i)
+ case !bitutil.IsMultipleOf8(blk.Body()):
+ return nil, fmt.Errorf("arrow/ipc: invalid file body=%d
position for record %d", blk.Body(), i)
}
msg, err := blk.NewMessage()
@@ -301,7 +403,7 @@ func (f *FileReader) RecordAt(i int) (arrow.Record, error) {
return nil, fmt.Errorf("arrow/ipc: message %d is not a Record",
i)
}
- return newRecord(f.schema, &f.memo, msg.meta,
bytes.NewReader(msg.body.Bytes()), f.swapEndianness, f.mem), nil
+ return newRecord(f.schema, &f.memo, msg.meta, msg.body,
f.swapEndianness, f.mem), nil
}
// Read reads the current record from the underlying stream and an error, if
any.
@@ -323,7 +425,7 @@ func (f *FileReader) ReadAt(i int64) (arrow.Record, error) {
return f.Record(int(i))
}
-func newRecord(schema *arrow.Schema, memo *dictutils.Memo, meta
*memory.Buffer, body ReadAtSeeker, swapEndianness bool, mem memory.Allocator)
arrow.Record {
+func newRecord(schema *arrow.Schema, memo *dictutils.Memo, meta
*memory.Buffer, body *memory.Buffer, swapEndianness bool, mem memory.Allocator)
arrow.Record {
var (
msg = flatbuf.GetRootAsMessage(meta.Bytes(), 0)
md flatbuf.RecordBatch
@@ -340,10 +442,10 @@ func newRecord(schema *arrow.Schema, memo
*dictutils.Memo, meta *memory.Buffer,
ctx := &arrayLoaderContext{
src: ipcSource{
- meta: &md,
- r: body,
- codec: codec,
- mem: mem,
+ meta: &md,
+ rawBytes: body,
+ codec: codec,
+ mem: mem,
},
memo: memo,
max: kMaxNestingDepth,
@@ -372,10 +474,10 @@ func newRecord(schema *arrow.Schema, memo
*dictutils.Memo, meta *memory.Buffer,
}
type ipcSource struct {
- meta *flatbuf.RecordBatch
- r ReadAtSeeker
- codec decompressor
- mem memory.Allocator
+ meta *flatbuf.RecordBatch
+ rawBytes *memory.Buffer
+ codec decompressor
+ mem memory.Allocator
}
func (src *ipcSource) buffer(i int) *memory.Buffer {
@@ -388,34 +490,23 @@ func (src *ipcSource) buffer(i int) *memory.Buffer {
return memory.NewBufferBytes(nil)
}
- raw := memory.NewResizableBuffer(src.mem)
+ var raw *memory.Buffer
if src.codec == nil {
- raw.Resize(int(buf.Length()))
- _, err := src.r.ReadAt(raw.Bytes(), buf.Offset())
- if err != nil {
- panic(err)
- }
+ raw = memory.SliceBuffer(src.rawBytes, int(buf.Offset()),
int(buf.Length()))
} else {
- sr := io.NewSectionReader(src.r, buf.Offset(), buf.Length())
- var uncompressedSize uint64
+ body := src.rawBytes.Bytes()[buf.Offset() :
buf.Offset()+buf.Length()]
+ uncompressedSize := int64(binary.LittleEndian.Uint64(body[:8]))
- err := binary.Read(sr, binary.LittleEndian, &uncompressedSize)
- if err != nil {
- panic(err)
- }
-
- var r io.Reader = sr
// check for an uncompressed buffer
- if int64(uncompressedSize) != -1 {
+ if uncompressedSize != -1 {
+ raw = memory.NewResizableBuffer(src.mem)
raw.Resize(int(uncompressedSize))
- src.codec.Reset(sr)
- r = src.codec
+ src.codec.Reset(bytes.NewReader(body[8:]))
+ if _, err := io.ReadFull(src.codec, raw.Bytes()); err
!= nil {
+ panic(err)
+ }
} else {
- raw.Resize(int(buf.Length() - 8))
- }
-
- if _, err = io.ReadFull(r, raw.Bytes()); err != nil {
- panic(err)
+ raw = memory.SliceBuffer(src.rawBytes,
int(buf.Offset())+8, int(buf.Length())-8)
}
}
@@ -717,7 +808,7 @@ func (ctx *arrayLoaderContext) loadUnion(dt
arrow.UnionType) arrow.ArrayData {
return array.NewData(dt, int(field.Length()), buffers, subs, 0, 0)
}
-func readDictionary(memo *dictutils.Memo, meta *memory.Buffer, body
ReadAtSeeker, swapEndianness bool, mem memory.Allocator) (dictutils.Kind,
error) {
+func readDictionary(memo *dictutils.Memo, meta *memory.Buffer, body
*memory.Buffer, swapEndianness bool, mem memory.Allocator) (dictutils.Kind,
error) {
var (
msg = flatbuf.GetRootAsMessage(meta.Bytes(), 0)
md flatbuf.DictionaryBatch
@@ -743,10 +834,10 @@ func readDictionary(memo *dictutils.Memo, meta
*memory.Buffer, body ReadAtSeeker
ctx := &arrayLoaderContext{
src: ipcSource{
- meta: &data,
- codec: codec,
- r: body,
- mem: mem,
+ meta: &data,
+ codec: codec,
+ rawBytes: body,
+ mem: mem,
},
memo: memo,
max: kMaxNestingDepth,
@@ -768,3 +859,44 @@ func readDictionary(memo *dictutils.Memo, meta
*memory.Buffer, body ReadAtSeeker
}
return dictutils.KindReplacement, nil
}
+
+type mappedFileBlock struct {
+ offset int64
+ meta int32
+ body int64
+
+ data []byte
+}
+
+func (blk mappedFileBlock) Offset() int64 { return blk.offset }
+func (blk mappedFileBlock) Meta() int32 { return blk.meta }
+func (blk mappedFileBlock) Body() int64 { return blk.body }
+
+func (blk mappedFileBlock) section() []byte {
+ return blk.data[blk.offset : blk.offset+int64(blk.meta)+blk.body]
+}
+
+func (blk mappedFileBlock) NewMessage() (*Message, error) {
+ var (
+ body *memory.Buffer
+ meta *memory.Buffer
+ buf = blk.section()
+ )
+
+ metaBytes := buf[:blk.meta]
+
+ prefix := 0
+ switch binary.LittleEndian.Uint32(metaBytes) {
+ case 0:
+ case kIPCContToken:
+ prefix = 8
+ default:
+ // ARROW-6314: backwards compatibility for reading old IPC
+ // messages produced prior to version 0.15.0
+ prefix = 4
+ }
+
+ meta = memory.NewBufferBytes(metaBytes[prefix:])
+ body = memory.NewBufferBytes(buf[blk.meta : int64(blk.meta)+blk.body])
+ return NewMessage(meta, body), nil
+}
diff --git a/arrow/ipc/file_writer.go b/arrow/ipc/file_writer.go
index c6aefd5..b57cafb 100644
--- a/arrow/ipc/file_writer.go
+++ b/arrow/ipc/file_writer.go
@@ -41,8 +41,8 @@ type fileWriter struct {
streamWriter
schema *arrow.Schema
- dicts []fileBlock
- recs []fileBlock
+ dicts []dataBlock
+ recs []dataBlock
}
func (w *fileWriter) Start() error {
@@ -63,13 +63,13 @@ func (w *fileWriter) Start() error {
}
func (w *fileWriter) WritePayload(p Payload) error {
- blk := fileBlock{Offset: w.pos, Meta: 0, Body: p.size}
+ blk := fileBlock{offset: w.pos, meta: 0, body: p.size}
n, err := writeIPCPayload(w, p)
if err != nil {
return err
}
- blk.Meta = int32(n)
+ blk.meta = int32(n)
switch flatbuf.MessageHeader(p.msg) {
case flatbuf.MessageHeaderDictionaryBatch:
diff --git a/arrow/ipc/metadata.go b/arrow/ipc/metadata.go
index a5bf187..b83c1a8 100644
--- a/arrow/ipc/metadata.go
+++ b/arrow/ipc/metadata.go
@@ -63,19 +63,23 @@ type bufferMetadata struct {
}
type fileBlock struct {
- Offset int64
- Meta int32
- Body int64
+ offset int64
+ meta int32
+ body int64
r io.ReaderAt
mem memory.Allocator
}
-func fileBlocksToFB(b *flatbuffers.Builder, blocks []fileBlock, start
startVecFunc) flatbuffers.UOffsetT {
+func (blk fileBlock) Offset() int64 { return blk.offset }
+func (blk fileBlock) Meta() int32 { return blk.meta }
+func (blk fileBlock) Body() int64 { return blk.body }
+
+func fileBlocksToFB(b *flatbuffers.Builder, blocks []dataBlock, start
startVecFunc) flatbuffers.UOffsetT {
start(b, len(blocks))
for i := len(blocks) - 1; i >= 0; i-- {
blk := blocks[i]
- flatbuf.CreateBlock(b, blk.Offset, blk.Meta, blk.Body)
+ flatbuf.CreateBlock(b, blk.Offset(), blk.Meta(), blk.Body())
}
return b.EndVector(len(blocks))
@@ -91,7 +95,7 @@ func (blk fileBlock) NewMessage() (*Message, error) {
)
meta = memory.NewResizableBuffer(blk.mem)
- meta.Resize(int(blk.Meta))
+ meta.Resize(int(blk.meta))
defer meta.Release()
buf = meta.Bytes()
@@ -112,12 +116,12 @@ func (blk fileBlock) NewMessage() (*Message, error) {
}
// drop buf-size already known from blk.Meta
- meta = memory.SliceBuffer(meta, prefix, int(blk.Meta)-prefix)
+ meta = memory.SliceBuffer(meta, prefix, int(blk.meta)-prefix)
defer meta.Release()
body = memory.NewResizableBuffer(blk.mem)
defer body.Release()
- body.Resize(int(blk.Body))
+ body.Resize(int(blk.body))
buf = body.Bytes()
_, err = io.ReadFull(r, buf)
if err != nil {
@@ -128,7 +132,7 @@ func (blk fileBlock) NewMessage() (*Message, error) {
}
func (blk fileBlock) section() io.Reader {
- return io.NewSectionReader(blk.r, blk.Offset, int64(blk.Meta)+blk.Body)
+ return io.NewSectionReader(blk.r, blk.offset, int64(blk.meta)+blk.body)
}
func unitFromFB(unit flatbuf.TimeUnit) arrow.TimeUnit {
@@ -1156,7 +1160,7 @@ func writeSchemaMessage(schema *arrow.Schema, mem
memory.Allocator, dict *dictut
return writeMessageFB(b, mem, flatbuf.MessageHeaderSchema, schemaFB, 0)
}
-func writeFileFooter(schema *arrow.Schema, dicts, recs []fileBlock, w
io.Writer) error {
+func writeFileFooter(schema *arrow.Schema, dicts, recs []dataBlock, w
io.Writer) error {
var (
b = flatbuffers.NewBuilder(1024)
memo dictutils.Mapper
diff --git a/arrow/ipc/metadata_test.go b/arrow/ipc/metadata_test.go
index 29b9b7f..0acce42 100644
--- a/arrow/ipc/metadata_test.go
+++ b/arrow/ipc/metadata_test.go
@@ -89,8 +89,8 @@ func TestRWSchema(t *testing.T) {
func TestRWFooter(t *testing.T) {
for _, tc := range []struct {
schema *arrow.Schema
- dicts []fileBlock
- recs []fileBlock
+ dicts []dataBlock
+ recs []dataBlock
}{
{
schema: arrow.NewSchema([]arrow.Field{
@@ -98,15 +98,15 @@ func TestRWFooter(t *testing.T) {
{Name: "f2", Type: arrow.PrimitiveTypes.Uint16},
{Name: "f3", Type:
arrow.PrimitiveTypes.Float64},
}, nil),
- dicts: []fileBlock{
- {Offset: 1, Meta: 2, Body: 3},
- {Offset: 4, Meta: 5, Body: 6},
- {Offset: 7, Meta: 8, Body: 9},
+ dicts: []dataBlock{
+ fileBlock{offset: 1, meta: 2, body: 3},
+ fileBlock{offset: 4, meta: 5, body: 6},
+ fileBlock{offset: 7, meta: 8, body: 9},
},
- recs: []fileBlock{
- {Offset: 0, Meta: 10, Body: 30},
- {Offset: 10, Meta: 30, Body: 60},
- {Offset: 20, Meta: 30, Body: 40},
+ recs: []dataBlock{
+ fileBlock{offset: 0, meta: 10, body: 30},
+ fileBlock{offset: 10, meta: 30, body: 60},
+ fileBlock{offset: 20, meta: 30, body: 40},
},
},
} {
@@ -142,7 +142,7 @@ func TestRWFooter(t *testing.T) {
if !footer.Dictionaries(&blk, i) {
t.Fatalf("could not get dictionary %d",
i)
}
- got := fileBlock{Offset: blk.Offset(), Meta:
blk.MetaDataLength(), Body: blk.BodyLength()}
+ got := fileBlock{offset: blk.Offset(), meta:
blk.MetaDataLength(), body: blk.BodyLength()}
want := dict
if got != want {
t.Errorf("dict[%d] differ:\ngot=
%v\nwant=%v", i, got, want)
@@ -158,7 +158,7 @@ func TestRWFooter(t *testing.T) {
if !footer.RecordBatches(&blk, i) {
t.Fatalf("could not get record %d", i)
}
- got := fileBlock{Offset: blk.Offset(), Meta:
blk.MetaDataLength(), Body: blk.BodyLength()}
+ got := fileBlock{offset: blk.Offset(), meta:
blk.MetaDataLength(), body: blk.BodyLength()}
want := rec
if got != want {
t.Errorf("record[%d] differ:\ngot=
%v\nwant=%v", i, got, want)
diff --git a/arrow/ipc/reader.go b/arrow/ipc/reader.go
index f74fddd..2a4f859 100644
--- a/arrow/ipc/reader.go
+++ b/arrow/ipc/reader.go
@@ -17,7 +17,6 @@
package ipc
import (
- "bytes"
"errors"
"fmt"
"io"
@@ -201,7 +200,7 @@ func (r *Reader) getInitialDicts() bool {
if msg.Type() != MessageDictionaryBatch {
r.err = fmt.Errorf("arrow/ipc: IPC stream did not have
the expected (%d) dictionaries at the start of the stream", numDicts)
}
- if _, err := readDictionary(&r.memo, msg.meta,
bytes.NewReader(msg.body.Bytes()), r.swapEndianness, r.mem); err != nil {
+ if _, err := readDictionary(&r.memo, msg.meta, msg.body,
r.swapEndianness, r.mem); err != nil {
r.done = true
r.err = err
return false
@@ -233,7 +232,7 @@ func (r *Reader) next() bool {
msg, r.err = r.r.Message()
for msg != nil && msg.Type() == MessageDictionaryBatch {
- if _, r.err = readDictionary(&r.memo, msg.meta,
bytes.NewReader(msg.body.Bytes()), r.swapEndianness, r.mem); r.err != nil {
+ if _, r.err = readDictionary(&r.memo, msg.meta, msg.body,
r.swapEndianness, r.mem); r.err != nil {
r.done = true
return false
}
@@ -252,7 +251,7 @@ func (r *Reader) next() bool {
return false
}
- r.rec = newRecord(r.schema, &r.memo, msg.meta,
bytes.NewReader(msg.body.Bytes()), r.swapEndianness, r.mem)
+ r.rec = newRecord(r.schema, &r.memo, msg.meta, msg.body,
r.swapEndianness, r.mem)
return true
}
diff --git a/arrow/ipc/reader_test.go b/arrow/ipc/reader_test.go
index 792519b..a78bb75 100644
--- a/arrow/ipc/reader_test.go
+++ b/arrow/ipc/reader_test.go
@@ -21,6 +21,7 @@ import (
"fmt"
"io"
"testing"
+ "unsafe"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
@@ -96,6 +97,79 @@ func TestReaderCheckedAllocator(t *testing.T) {
require.NoError(t, err)
}
+func TestMappedReader(t *testing.T) {
+ pool := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer pool.AssertSize(t, 0)
+ schema := arrow.NewSchema([]arrow.Field{{Name: "f1", Type:
arrow.PrimitiveTypes.Int32}}, nil)
+ b := array.NewRecordBuilder(pool, schema)
+ defer b.Release()
+ b.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3, 4},
[]bool{true, true, false, true})
+
+ rec1 := b.NewRecord()
+ defer rec1.Release()
+
+ tbl := array.NewTableFromRecords(schema, []arrow.Record{rec1})
+ defer tbl.Release()
+
+ var buf bytes.Buffer
+ ipcWriter, err := NewFileWriter(&buf, WithAllocator(pool),
WithSchema(schema))
+ require.NoError(t, err)
+
+ t.Log("Reading data before")
+ tr := array.NewTableReader(tbl, 2)
+ defer tr.Release()
+
+ n := 0
+ for tr.Next() {
+ rec := tr.Record()
+ for i, col := range rec.Columns() {
+ t.Logf("rec[%d][%q]: %v nulls:%v\n", n,
+ rec.ColumnName(i), col, col.NullBitmapBytes())
+ }
+ n++
+ err := ipcWriter.Write(rec)
+ if err != nil {
+ panic(err)
+ }
+ }
+ require.NoError(t, ipcWriter.Close())
+
+ t.Log("Reading data after")
+ rdr, err := NewMappedFileReader(buf.Bytes(), WithAllocator(pool))
+ require.NoError(t, err)
+ defer rdr.Close()
+
+ rec, err := rdr.RecordAt(0)
+ require.NoError(t, err)
+ defer rec.Release()
+
+ // get offset and block info into the buffer bytes
+ blk, err := rdr.r.block(nil, &rdr.footer, 0)
+ require.NoError(t, err)
+
+ // determine pointer location of bytes for the first buffer
+ // no nulls, so only one buffer
+ start := unsafe.Pointer(unsafe.SliceData(buf.Bytes()))
+ loc := unsafe.Add(unsafe.Add(start, blk.Offset()), blk.Meta())
+ // ensure our buffer pointer matches the calculated pointer
+ assert.Equal(t, (*byte)(loc),
unsafe.SliceData(rec.Column(0).Data().Buffers()[1].Bytes()))
+
+ rec, err = rdr.RecordAt(1)
+ require.NoError(t, err)
+ defer rec.Release()
+
+ blk, err = rdr.r.block(nil, &rdr.footer, 1)
+ require.NoError(t, err)
+
+ start = unsafe.Pointer(unsafe.SliceData(buf.Bytes()))
+ loc = unsafe.Add(unsafe.Add(start, blk.Offset()), blk.Meta())
+ // check pointer of validity bitmap location
+ assert.Equal(t, (*byte)(loc),
unsafe.SliceData(rec.Column(0).Data().Buffers()[0].Bytes()))
+ // calculate and check pointer of data buffer
+ loc = unsafe.Add(loc, rec.Column(0).Data().Buffers()[0].Len())
+ assert.Equal(t, (*byte)(loc),
unsafe.SliceData(rec.Column(0).Data().Buffers()[1].Bytes()))
+}
+
func BenchmarkIPC(b *testing.B) {
alloc := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer alloc.AssertSize(b, 0)