This is an automated email from the ASF dual-hosted git repository.

quinnj pushed a commit to branch fix-580-non-seekable-io
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git

commit 4f58e47e59ac1909326702d45f102ae86fda18d7
Author: Jacob Quinn <[email protected]>
AuthorDate: Thu Jan 15 17:20:09 2026 -0700

    Fix non-seekable I/O support for Arrow.Stream (#580)
    
    When reading Arrow data from non-seekable streams (FIFOs, pipes, sockets),
    `Arrow.Stream`/`Arrow.Table` would fail because `Mmap.mmap()` doesn't work
    on these stream types.
    
    Changes:
    - Modified `tobytes(io::IOStream)` to detect non-seekable streams and fall
      back to `Base.read(io)` instead of `Mmap.mmap(io)`
    - Modified `Base.write(io::IO, msg::Message, ...)` to handle non-seekable
      outputs by wrapping `position(io)` in try-catch and skipping block
      tracking for non-seekable streams (only needed for file format footer)
    
    This enables Arrow data to be read from process pipes, FIFOs, and other
    non-seekable I/O sources.
    
    Co-Authored-By: Claude Opus 4.5 <[email protected]>
---
 src/table.jl | 21 ++++++++++++++++++++-
 src/write.jl | 18 ++++++++++++++----
 2 files changed, 34 insertions(+), 5 deletions(-)

diff --git a/src/table.jl b/src/table.jl
index de8bfc3..eb71cc5 100644
--- a/src/table.jl
+++ b/src/table.jl
@@ -25,7 +25,26 @@ ArrowBlob(bytes::Vector{UInt8}, pos::Int, len::Nothing) =
 
 tobytes(bytes::Vector{UInt8}) = bytes
 tobytes(io::IO) = Base.read(io)
-tobytes(io::IOStream) = Mmap.mmap(io)
+
+function tobytes(io::IOStream)
+    # Try to use mmap for seekable streams (regular files)
+    # Fall back to read() for non-seekable streams (FIFOs, pipes, etc.)
+    # where mmap would return empty bytes or fail
+    try
+        # Check if stream is seekable by testing position/seek
+        pos = position(io)
+        seek(io, pos)
+        # Also check filesize - FIFOs report size 0
+        if filesize(io) > 0
+            return Mmap.mmap(io)
+        end
+    catch
+        # Not seekable, fall through to read
+    end
+    # Non-seekable or zero-size: read all bytes
+    return Base.read(io)
+end
+
 tobytes(file_path) = open(tobytes, file_path, "r")
 
 struct BatchIterator
diff --git a/src/write.jl b/src/write.jl
index 4c3800f..ce1c583 100644
--- a/src/write.jl
+++ b/src/write.jl
@@ -567,10 +567,20 @@ function Base.write(io::IO, msg::Message, blocks, sch, 
alignment)
     metalen = padding(length(msg.msgflatbuf), alignment)
     @debug "writing message: metalen = $metalen, bodylen = $(msg.bodylen), 
isrecordbatch = $(msg.isrecordbatch), headerType = $(msg.headerType)"
     if msg.blockmsg
-        push!(
-            blocks[msg.isrecordbatch ? 1 : 2],
-            Block(position(io), metalen + 8, msg.bodylen),
-        )
+        # Track block positions for file format footer
+        # Skip for non-seekable streams (streaming to FIFOs, sockets, etc.)
+        # where position() would fail - block tracking is only needed for file 
format
+        pos = try
+            position(io)
+        catch
+            nothing  # Non-seekable stream, skip block tracking
+        end
+        if pos !== nothing
+            push!(
+                blocks[msg.isrecordbatch ? 1 : 2],
+                Block(pos, metalen + 8, msg.bodylen),
+            )
+        end
     end
     # now write the final message spec out
     # continuation byte

Reply via email to