This is an automated email from the ASF dual-hosted git repository.
quinnj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git
The following commit(s) were added to refs/heads/main by this push:
new cc983da support reading empty files that are in the process of being
written (#338)
cc983da is described below
commit cc983da97b835597af5db082c9426e275cf67352
Author: Ben Baumgold <[email protected]>
AuthorDate: Thu Oct 6 01:54:34 2022 -0400
support reading empty files that are in the process of being written (#338)
Co-authored-by: Ben Baumgold <[email protected]>
---
src/table.jl | 93 +++++++++++++++++++++++++++++---------------------------
test/runtests.jl | 6 ++++
2 files changed, 54 insertions(+), 45 deletions(-)
diff --git a/src/table.jl b/src/table.jl
index 88d65b1..259bac2 100644
--- a/src/table.jl
+++ b/src/table.jl
@@ -58,82 +58,85 @@ lazily converted to more friendly Julia defaults; by
default, `convert=true`.
mutable struct Stream
inputs::Vector{ArrowBlob}
inputindex::Int
- batchiterator::BatchIterator
- pos::Int
+ batchiterator::Union{Nothing,BatchIterator}
names::Vector{Symbol}
types::Vector{Type}
- schema::Meta.Schema
+ schema::Union{Nothing,Meta.Schema}
dictencodings::Dict{Int64, DictEncoding} # dictionary id => DictEncoding
dictencoded::Dict{Int64, Meta.Field} # dictionary id => field
convert::Bool
compression::Ref{Union{Symbol,Nothing}}
end
+function Stream(inputs::Vector{ArrowBlob}; convert::Bool=true)
+ inputindex = 1
+ batchiterator = nothing
+ names = Symbol[]
+ types = Type[]
+ schema = nothing
+ dictencodings = Dict{Int64, DictEncoding}()
+ dictencoded = Dict{Int64, Meta.Field}()
+ compression = Ref{Union{Symbol,Nothing}}(nothing)
+ Stream(inputs, inputindex, batchiterator, names, types, schema,
dictencodings, dictencoded, convert, compression)
+end
+
Tables.partitions(x::Stream) = x
Stream(input, pos::Integer=1, len=nothing; kw...) =
Stream([ArrowBlob(tobytes(input), pos, len)]; kw...)
Stream(input::Vector{UInt8}, pos::Integer=1, len=nothing; kw...) =
Stream([ArrowBlob(tobytes(input), pos, len)]; kw...)
Stream(inputs::Vector; kw...) = Stream([ArrowBlob(tobytes(x), 1, nothing) for
x in inputs]; kw...)
-# will detect whether we're reading a Stream from a file or stream
-function Stream(inputs::Vector{ArrowBlob}; convert::Bool=true)
- dictencodings = Dict{Int64, DictEncoding}() # dictionary id => DictEncoding
- dictencoded = Dict{Int64, Meta.Field}() # dictionary id => field
- blob = inputs[1]
+Base.IteratorSize(::Type{Stream}) = Base.SizeUnknown()
+
+function BatchIterator(blob::ArrowBlob)
bytes, pos, len = blob.bytes, blob.pos, blob.len
if len > 24 &&
_startswith(bytes, pos, FILE_FORMAT_MAGIC_BYTES) &&
_endswith(bytes, pos + len - 1, FILE_FORMAT_MAGIC_BYTES)
pos += 8 # skip past magic bytes + padding
end
- batchiterator = BatchIterator(bytes, pos)
- state = iterate(batchiterator)
- state === nothing && throw(ArgumentError("no arrow ipc messages found in
provided input"))
- batch, (pos, id) = state
- schema = batch.msg.header
- schema isa Meta.Schema || throw(ArgumentError("first arrow ipc message
MUST be a schema message"))
- # assert endianness?
- # store custom_metadata?
- names = Symbol[]
- types = Type[]
- for (i, field) in enumerate(schema.fields)
- push!(names, Symbol(field.name))
- push!(types, juliaeltype(field, buildmetadata(field.custom_metadata),
convert))
- # recursively find any dictionaries for any fields
- getdictionaries!(dictencoded, field)
- @debug 1 "parsed column from schema: field = $field"
- end
- return Stream(inputs, 1, batchiterator, pos, names, types, schema,
dictencodings, dictencoded, convert, Ref{Union{Symbol,Nothing}}(nothing))
+ BatchIterator(bytes, pos)
end
-Base.IteratorSize(::Type{Stream}) = Base.SizeUnknown()
+function Base.iterate(x::Stream, (pos, id)=(1, 0))
+ if isnothing(x.batchiterator)
+ blob = x.inputs[x.inputindex]
+ x.batchiterator = BatchIterator(blob)
+ pos = x.batchiterator.startpos
+ end
-function Base.iterate(x::Stream, (pos, id)=(x.pos, 1))
columns = AbstractVector[]
compression = nothing
while true
state = iterate(x.batchiterator, (pos, id))
- if state === nothing
- # check for additional inputs
- while state === nothing
- x.inputindex += 1
- x.inputindex > length(x.inputs) && return nothing
- blob = x.inputs[x.inputindex]
- bytes, pos, len = blob.bytes, blob.pos, blob.len
- if len > 24 &&
- _startswith(bytes, pos, FILE_FORMAT_MAGIC_BYTES) &&
- _endswith(bytes, pos + len - 1, FILE_FORMAT_MAGIC_BYTES)
- pos += 8 # skip past magic bytes + padding
- end
- x.batchiterator = BatchIterator(bytes, pos)
- state = iterate(x.batchiterator, (1, id))
- end
+ # check for additional inputs
+ while state === nothing
+ x.inputindex += 1
+ x.inputindex > length(x.inputs) && return nothing
+ blob = x.inputs[x.inputindex]
+ x.batchiterator = BatchIterator(blob)
+ pos = x.batchiterator.startpos
+ state = iterate(x.batchiterator, (pos, id))
end
batch, (pos, id) = state
header = batch.msg.header
+ if isnothing(x.schema) && !isa(header, Meta.Schema)
+ throw(ArgumentError("first arrow ipc message MUST be a schema
message"))
+ end
if header isa Meta.Schema
- if header != x.schema
+ if isnothing(x.schema)
+ x.schema = header
+ # assert endianness?
+ # store custom_metadata?
+ for (i, field) in enumerate(x.schema.fields)
+ push!(x.names, Symbol(field.name))
+ push!(x.types, juliaeltype(field,
buildmetadata(field.custom_metadata), x.convert))
+ # recursively find any dictionaries for any fields
+ getdictionaries!(x.dictencoded, field)
+ @debug 1 "parsed column from schema: field = $field"
+ end
+ elseif header != x.schema
throw(ArgumentError("mismatched schemas between different
arrow batches: $(x.schema) != $header"))
end
elseif header isa Meta.DictionaryBatch
@@ -359,7 +362,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
lu = lookup(t)
ty = types(t)
# 158; some implementations may send 0 record batches
- if !anyrecordbatches
+ if !anyrecordbatches && !isnothing(sch)
for field in sch.fields
T = juliaeltype(field, buildmetadata(field), convert)
push!(columns(t), T[])
diff --git a/test/runtests.jl b/test/runtests.jl
index 4bd6c16..dd79475 100644
--- a/test/runtests.jl
+++ b/test/runtests.jl
@@ -527,6 +527,12 @@ t3 = Arrow.Table(Arrow.tobuffer(t2))
@test table.b == collect(b)
end
+# Empty input
+@test Arrow.Table(UInt8[]) isa Arrow.Table
+@test isempty(Tables.rows(Arrow.Table(UInt8[])))
+@test Arrow.Stream(UInt8[]) isa Arrow.Stream
+@test isempty(Tables.partitions(Arrow.Stream(UInt8[])))
+
end # @testset "misc"
end