This is an automated email from the ASF dual-hosted git repository.

quinnj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git


The following commit(s) were added to refs/heads/main by this push:
     new 36f8958  enable incremental reads of arrow-formatted files (#408)
36f8958 is described below

commit 36f895860c046b8b10224cfad44bf43c0992c4bf
Author: Ben Baumgold <[email protected]>
AuthorDate: Fri May 26 16:12:02 2023 -0400

    enable incremental reads of arrow-formatted files (#408)
    
    This allows users to read Arrow files while they're being written even
    when not using IPC mode. This is accomplished by removing the
    requirement that the data must end with the magic "ARROW1".
---
 src/table.jl | 25 ++++++++-----------------
 src/utils.jl | 12 +-----------
 src/write.jl |  4 ++--
 3 files changed, 11 insertions(+), 30 deletions(-)

diff --git a/src/table.jl b/src/table.jl
index 66fd584..b2d982d 100644
--- a/src/table.jl
+++ b/src/table.jl
@@ -30,6 +30,13 @@ tobytes(file_path) = open(tobytes, file_path, "r")
 struct BatchIterator
     bytes::Vector{UInt8}
     startpos::Int
+    function BatchIterator(blob::ArrowBlob)
+        bytes, pos, len = blob.bytes, blob.pos, blob.len
+        if len > 24 && _startswith(bytes, pos, FILE_FORMAT_MAGIC_BYTES)
+            pos += 8 # skip past magic bytes + padding
+        end
+        new(bytes, pos)
+    end
 end
 
 """
@@ -110,16 +117,6 @@ end
 Base.IteratorSize(::Type{Stream}) = Base.SizeUnknown()
 Base.eltype(::Type{Stream}) = Table
 
-function BatchIterator(blob::ArrowBlob)
-    bytes, pos, len = blob.bytes, blob.pos, blob.len
-    if len > 24 &&
-        _startswith(bytes, pos, FILE_FORMAT_MAGIC_BYTES) &&
-        _endswith(bytes, pos + len - 1, FILE_FORMAT_MAGIC_BYTES)
-        pos += 8 # skip past magic bytes + padding
-    end
-    BatchIterator(bytes, pos)
-end
-
 function Base.iterate(x::Stream, (pos, id)=(1, 0))
     if isnothing(x.batchiterator)
         blob = x.inputs[x.inputindex]
@@ -355,13 +352,7 @@ function Table(blobs::Vector{ArrowBlob}; 
convert::Bool=true)
     anyrecordbatches = false
     rbi = 1
     @sync for blob in blobs
-        bytes, pos, len = blob.bytes, blob.pos, blob.len
-        if len > 24 &&
-            _startswith(bytes, pos, FILE_FORMAT_MAGIC_BYTES) &&
-            _endswith(bytes, pos + len - 1, FILE_FORMAT_MAGIC_BYTES)
-            pos += 8 # skip past magic bytes + padding
-        end
-        for batch in BatchIterator(bytes, pos)
+        for batch in BatchIterator(blob)
             # store custom_metadata of batch.msg?
             header = batch.msg.header
             if header isa Meta.Schema
diff --git a/src/utils.jl b/src/utils.jl
index 223c6c2..2fe31e9 100644
--- a/src/utils.jl
+++ b/src/utils.jl
@@ -73,7 +73,7 @@ end
 # count # of missing elements in an iterable
 nullcount(col) = count(ismissing, col)
 
-# like startswith/endswith for strings, but on byte buffers
+# like startswith for strings, but on byte buffers
 function _startswith(a::AbstractVector{UInt8}, pos::Integer, 
b::AbstractVector{UInt8})
     for i = 1:length(b)
         @inbounds check = a[pos + i - 1] == b[i]
@@ -82,16 +82,6 @@ function _startswith(a::AbstractVector{UInt8}, pos::Integer, 
b::AbstractVector{U
     return true
 end
 
-function _endswith(a::AbstractVector{UInt8}, endpos::Integer, 
b::AbstractVector{UInt8})
-    aoff = endpos - length(b) + 1
-    for i = 1:length(b)
-        @inbounds check = a[aoff] == b[i]
-        check || return false
-        aoff += 1
-    end
-    return true
-end
-
 # read a single element from a byte vector
 # copied from read(::IOBuffer, T) in Base
 function readbuffer(t::AbstractVector{UInt8}, pos::Integer, ::Type{T}) where 
{T}
diff --git a/src/write.jl b/src/write.jl
index 1c6975d..d982b09 100644
--- a/src/write.jl
+++ b/src/write.jl
@@ -189,7 +189,7 @@ function write(writer::Writer, source)
         if !isassigned(writer.firstcols)
             if writer.writetofile
                 @debugv 1 "starting write of arrow formatted file"
-                Base.write(writer.io, "ARROW1\0\0")
+                Base.write(writer.io, FILE_FORMAT_MAGIC_BYTES, b"\0\0")
             end
             meta = isnothing(writer.meta) ? getmetadata(source) : writer.meta
             cols = toarrowtable(tblcols, writer.dictencodings, 
writer.largelists, writer.compress, writer.denseunions, writer.dictencode, 
writer.dictencodenested, writer.maxdepth, meta, writer.colmeta)
@@ -358,7 +358,7 @@ function Base.write(io::IO, msg::Message, blocks, sch, 
alignment)
     end
     # now write the final message spec out
     # continuation byte
-    n = Base.write(io, 0xFFFFFFFF)
+    n = Base.write(io, CONTINUATION_INDICATOR_BYTES)
     # metadata length
     n += Base.write(io, Int32(metalen))
     # message flatbuffer

Reply via email to