This is an automated email from the ASF dual-hosted git repository.
quinnj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git
The following commit(s) were added to refs/heads/main by this push:
new 36f8958 enable incremental reads of arrow-formatted files (#408)
36f8958 is described below
commit 36f895860c046b8b10224cfad44bf43c0992c4bf
Author: Ben Baumgold <[email protected]>
AuthorDate: Fri May 26 16:12:02 2023 -0400
enable incremental reads of arrow-formatted files (#408)
This allows users to read Arrow files while they're being written even
when not using IPC mode. This is accomplished by removing the
requirement that the data must end with the magic "ARROW1".
---
src/table.jl | 25 ++++++++-----------------
src/utils.jl | 12 +-----------
src/write.jl | 4 ++--
3 files changed, 11 insertions(+), 30 deletions(-)
diff --git a/src/table.jl b/src/table.jl
index 66fd584..b2d982d 100644
--- a/src/table.jl
+++ b/src/table.jl
@@ -30,6 +30,13 @@ tobytes(file_path) = open(tobytes, file_path, "r")
struct BatchIterator
bytes::Vector{UInt8}
startpos::Int
+ function BatchIterator(blob::ArrowBlob)
+ bytes, pos, len = blob.bytes, blob.pos, blob.len
+ if len > 24 && _startswith(bytes, pos, FILE_FORMAT_MAGIC_BYTES)
+ pos += 8 # skip past magic bytes + padding
+ end
+ new(bytes, pos)
+ end
end
"""
@@ -110,16 +117,6 @@ end
Base.IteratorSize(::Type{Stream}) = Base.SizeUnknown()
Base.eltype(::Type{Stream}) = Table
-function BatchIterator(blob::ArrowBlob)
- bytes, pos, len = blob.bytes, blob.pos, blob.len
- if len > 24 &&
- _startswith(bytes, pos, FILE_FORMAT_MAGIC_BYTES) &&
- _endswith(bytes, pos + len - 1, FILE_FORMAT_MAGIC_BYTES)
- pos += 8 # skip past magic bytes + padding
- end
- BatchIterator(bytes, pos)
-end
-
function Base.iterate(x::Stream, (pos, id)=(1, 0))
if isnothing(x.batchiterator)
blob = x.inputs[x.inputindex]
@@ -355,13 +352,7 @@ function Table(blobs::Vector{ArrowBlob};
convert::Bool=true)
anyrecordbatches = false
rbi = 1
@sync for blob in blobs
- bytes, pos, len = blob.bytes, blob.pos, blob.len
- if len > 24 &&
- _startswith(bytes, pos, FILE_FORMAT_MAGIC_BYTES) &&
- _endswith(bytes, pos + len - 1, FILE_FORMAT_MAGIC_BYTES)
- pos += 8 # skip past magic bytes + padding
- end
- for batch in BatchIterator(bytes, pos)
+ for batch in BatchIterator(blob)
# store custom_metadata of batch.msg?
header = batch.msg.header
if header isa Meta.Schema
diff --git a/src/utils.jl b/src/utils.jl
index 223c6c2..2fe31e9 100644
--- a/src/utils.jl
+++ b/src/utils.jl
@@ -73,7 +73,7 @@ end
# count # of missing elements in an iterable
nullcount(col) = count(ismissing, col)
-# like startswith/endswith for strings, but on byte buffers
+# like startswith for strings, but on byte buffers
function _startswith(a::AbstractVector{UInt8}, pos::Integer,
b::AbstractVector{UInt8})
for i = 1:length(b)
@inbounds check = a[pos + i - 1] == b[i]
@@ -82,16 +82,6 @@ function _startswith(a::AbstractVector{UInt8}, pos::Integer,
b::AbstractVector{U
return true
end
-function _endswith(a::AbstractVector{UInt8}, endpos::Integer,
b::AbstractVector{UInt8})
- aoff = endpos - length(b) + 1
- for i = 1:length(b)
- @inbounds check = a[aoff] == b[i]
- check || return false
- aoff += 1
- end
- return true
-end
-
# read a single element from a byte vector
# copied from read(::IOBuffer, T) in Base
function readbuffer(t::AbstractVector{UInt8}, pos::Integer, ::Type{T}) where
{T}
diff --git a/src/write.jl b/src/write.jl
index 1c6975d..d982b09 100644
--- a/src/write.jl
+++ b/src/write.jl
@@ -189,7 +189,7 @@ function write(writer::Writer, source)
if !isassigned(writer.firstcols)
if writer.writetofile
@debugv 1 "starting write of arrow formatted file"
- Base.write(writer.io, "ARROW1\0\0")
+ Base.write(writer.io, FILE_FORMAT_MAGIC_BYTES, b"\0\0")
end
meta = isnothing(writer.meta) ? getmetadata(source) : writer.meta
cols = toarrowtable(tblcols, writer.dictencodings,
writer.largelists, writer.compress, writer.denseunions, writer.dictencode,
writer.dictencodenested, writer.maxdepth, meta, writer.colmeta)
@@ -358,7 +358,7 @@ function Base.write(io::IO, msg::Message, blocks, sch,
alignment)
end
# now write the final message spec out
# continuation byte
- n = Base.write(io, 0xFFFFFFFF)
+ n = Base.write(io, CONTINUATION_INDICATOR_BYTES)
# metadata length
n += Base.write(io, Int32(metalen))
# message flatbuffer