This is an automated email from the ASF dual-hosted git repository. quinnj pushed a commit to branch fix-580-non-seekable-io in repository https://gitbox.apache.org/repos/asf/arrow-julia.git
commit 4f58e47e59ac1909326702d45f102ae86fda18d7 Author: Jacob Quinn <[email protected]> AuthorDate: Thu Jan 15 17:20:09 2026 -0700 Fix non-seekable I/O support for Arrow.Stream (#580) When reading Arrow data from non-seekable streams (FIFOs, pipes, sockets), `Arrow.Stream`/`Arrow.Table` would fail because `Mmap.mmap()` doesn't work on these stream types. Changes: - Modified `tobytes(io::IOStream)` to detect non-seekable streams and fall back to `Base.read(io)` instead of `Mmap.mmap(io)` - Modified `Base.write(io::IO, msg::Message, ...)` to handle non-seekable outputs by wrapping `position(io)` in try-catch and skipping block tracking for non-seekable streams (only needed for file format footer) This enables Arrow data to be read from process pipes, FIFOs, and other non-seekable I/O sources. Co-Authored-By: Claude Opus 4.5 <[email protected]> --- src/table.jl | 21 ++++++++++++++++++++- src/write.jl | 18 ++++++++++++++---- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/src/table.jl b/src/table.jl index de8bfc3..eb71cc5 100644 --- a/src/table.jl +++ b/src/table.jl @@ -25,7 +25,26 @@ ArrowBlob(bytes::Vector{UInt8}, pos::Int, len::Nothing) = tobytes(bytes::Vector{UInt8}) = bytes tobytes(io::IO) = Base.read(io) -tobytes(io::IOStream) = Mmap.mmap(io) + +function tobytes(io::IOStream) + # Try to use mmap for seekable streams (regular files) + # Fall back to read() for non-seekable streams (FIFOs, pipes, etc.) + # where mmap would return empty bytes or fail + try + # Check if stream is seekable by testing position/seek + pos = position(io) + seek(io, pos) + # Also check filesize - FIFOs report size 0 + if filesize(io) > 0 + return Mmap.mmap(io) + end + catch + # Not seekable, fall through to read + end + # Non-seekable or zero-size: read all bytes + return Base.read(io) +end + tobytes(file_path) = open(tobytes, file_path, "r") struct BatchIterator diff --git a/src/write.jl b/src/write.jl index 4c3800f..ce1c583 100644 --- a/src/write.jl +++ b/src/write.jl @@ -567,10 +567,20 @@ function Base.write(io::IO, msg::Message, blocks, sch, alignment) metalen = padding(length(msg.msgflatbuf), alignment) @debug "writing message: metalen = $metalen, bodylen = $(msg.bodylen), isrecordbatch = $(msg.isrecordbatch), headerType = $(msg.headerType)" if msg.blockmsg - push!( - blocks[msg.isrecordbatch ? 1 : 2], - Block(position(io), metalen + 8, msg.bodylen), - ) + # Track block positions for file format footer + # Skip for non-seekable streams (streaming to FIFOs, sockets, etc.) + # where position() would fail - block tracking is only needed for file format + pos = try + position(io) + catch + nothing # Non-seekable stream, skip block tracking + end + if pos !== nothing + push!( + blocks[msg.isrecordbatch ? 1 : 2], + Block(pos, metalen + 8, msg.bodylen), + ) + end end # now write the final message spec out # continuation byte
