This is an automated email from the ASF dual-hosted git repository.

quinnj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git


The following commit(s) were added to refs/heads/main by this push:
     new cc983da  support reading empty files that are in the process of being 
written (#338)
cc983da is described below

commit cc983da97b835597af5db082c9426e275cf67352
Author: Ben Baumgold <[email protected]>
AuthorDate: Thu Oct 6 01:54:34 2022 -0400

    support reading empty files that are in the process of being written (#338)
    
    Co-authored-by: Ben Baumgold <[email protected]>
---
 src/table.jl     | 93 +++++++++++++++++++++++++++++---------------------------
 test/runtests.jl |  6 ++++
 2 files changed, 54 insertions(+), 45 deletions(-)

diff --git a/src/table.jl b/src/table.jl
index 88d65b1..259bac2 100644
--- a/src/table.jl
+++ b/src/table.jl
@@ -58,82 +58,85 @@ lazily converted to more friendly Julia defaults; by 
default, `convert=true`.
 mutable struct Stream
     inputs::Vector{ArrowBlob}
     inputindex::Int
-    batchiterator::BatchIterator
-    pos::Int
+    batchiterator::Union{Nothing,BatchIterator}
     names::Vector{Symbol}
     types::Vector{Type}
-    schema::Meta.Schema
+    schema::Union{Nothing,Meta.Schema}
     dictencodings::Dict{Int64, DictEncoding} # dictionary id => DictEncoding
     dictencoded::Dict{Int64, Meta.Field} # dictionary id => field
     convert::Bool
     compression::Ref{Union{Symbol,Nothing}}
 end
 
+function Stream(inputs::Vector{ArrowBlob}; convert::Bool=true)
+    inputindex = 1
+    batchiterator = nothing
+    names = Symbol[]
+    types = Type[]
+    schema = nothing
+    dictencodings = Dict{Int64, DictEncoding}()
+    dictencoded = Dict{Int64, Meta.Field}()
+    compression = Ref{Union{Symbol,Nothing}}(nothing)
+    Stream(inputs, inputindex, batchiterator, names, types, schema, 
dictencodings, dictencoded, convert, compression)
+end
+
 Tables.partitions(x::Stream) = x
 
 Stream(input, pos::Integer=1, len=nothing; kw...) = 
Stream([ArrowBlob(tobytes(input), pos, len)]; kw...)
 Stream(input::Vector{UInt8}, pos::Integer=1, len=nothing; kw...) = 
Stream([ArrowBlob(tobytes(input), pos, len)]; kw...)
 Stream(inputs::Vector; kw...) = Stream([ArrowBlob(tobytes(x), 1, nothing) for 
x in inputs]; kw...)
 
-# will detect whether we're reading a Stream from a file or stream
-function Stream(inputs::Vector{ArrowBlob}; convert::Bool=true)
-    dictencodings = Dict{Int64, DictEncoding}() # dictionary id => DictEncoding
-    dictencoded = Dict{Int64, Meta.Field}() # dictionary id => field
-    blob = inputs[1]
+Base.IteratorSize(::Type{Stream}) = Base.SizeUnknown()
+
+function BatchIterator(blob::ArrowBlob)
     bytes, pos, len = blob.bytes, blob.pos, blob.len
     if len > 24 &&
         _startswith(bytes, pos, FILE_FORMAT_MAGIC_BYTES) &&
         _endswith(bytes, pos + len - 1, FILE_FORMAT_MAGIC_BYTES)
         pos += 8 # skip past magic bytes + padding
     end
-    batchiterator = BatchIterator(bytes, pos)
-    state = iterate(batchiterator)
-    state === nothing && throw(ArgumentError("no arrow ipc messages found in 
provided input"))
-    batch, (pos, id) = state
-    schema = batch.msg.header
-    schema isa Meta.Schema || throw(ArgumentError("first arrow ipc message 
MUST be a schema message"))
-    # assert endianness?
-    # store custom_metadata?
-    names = Symbol[]
-    types = Type[]
-    for (i, field) in enumerate(schema.fields)
-        push!(names, Symbol(field.name))
-        push!(types, juliaeltype(field, buildmetadata(field.custom_metadata), 
convert))
-        # recursively find any dictionaries for any fields
-        getdictionaries!(dictencoded, field)
-        @debug 1 "parsed column from schema: field = $field"
-    end
-    return Stream(inputs, 1, batchiterator, pos, names, types, schema, 
dictencodings, dictencoded, convert, Ref{Union{Symbol,Nothing}}(nothing))
+    BatchIterator(bytes, pos)
 end
 
-Base.IteratorSize(::Type{Stream}) = Base.SizeUnknown()
+function Base.iterate(x::Stream, (pos, id)=(1, 0))
+    if isnothing(x.batchiterator)
+        blob = x.inputs[x.inputindex]
+        x.batchiterator = BatchIterator(blob)
+        pos = x.batchiterator.startpos
+    end
 
-function Base.iterate(x::Stream, (pos, id)=(x.pos, 1))
     columns = AbstractVector[]
     compression = nothing
 
     while true
         state = iterate(x.batchiterator, (pos, id))
-        if state === nothing
-            # check for additional inputs
-            while state === nothing
-                x.inputindex += 1
-                x.inputindex > length(x.inputs) && return nothing
-                blob = x.inputs[x.inputindex]
-                bytes, pos, len = blob.bytes, blob.pos, blob.len
-                if len > 24 &&
-                    _startswith(bytes, pos, FILE_FORMAT_MAGIC_BYTES) &&
-                    _endswith(bytes, pos + len - 1, FILE_FORMAT_MAGIC_BYTES)
-                    pos += 8 # skip past magic bytes + padding
-                end
-                x.batchiterator = BatchIterator(bytes, pos)
-                state = iterate(x.batchiterator, (1, id))
-            end
+        # check for additional inputs
+        while state === nothing
+            x.inputindex += 1
+            x.inputindex > length(x.inputs) && return nothing
+            blob = x.inputs[x.inputindex]
+            x.batchiterator = BatchIterator(blob)
+            pos = x.batchiterator.startpos
+            state = iterate(x.batchiterator, (pos, id))
         end
         batch, (pos, id) = state
         header = batch.msg.header
+        if isnothing(x.schema) && !isa(header, Meta.Schema)
+            throw(ArgumentError("first arrow ipc message MUST be a schema 
message"))
+        end
         if header isa Meta.Schema
-            if header != x.schema
+            if isnothing(x.schema)
+                x.schema = header
+                # assert endianness?
+                # store custom_metadata?
+                for (i, field) in enumerate(x.schema.fields)
+                    push!(x.names, Symbol(field.name))
+                    push!(x.types, juliaeltype(field, 
buildmetadata(field.custom_metadata), x.convert))
+                    # recursively find any dictionaries for any fields
+                    getdictionaries!(x.dictencoded, field)
+                    @debug 1 "parsed column from schema: field = $field"
+                end
+            elseif header != x.schema
                 throw(ArgumentError("mismatched schemas between different 
arrow batches: $(x.schema) != $header"))
             end
         elseif header isa Meta.DictionaryBatch
@@ -359,7 +362,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
     lu = lookup(t)
     ty = types(t)
     # 158; some implementations may send 0 record batches
-    if !anyrecordbatches
+    if !anyrecordbatches && !isnothing(sch)
         for field in sch.fields
             T = juliaeltype(field, buildmetadata(field), convert)
             push!(columns(t), T[])
diff --git a/test/runtests.jl b/test/runtests.jl
index 4bd6c16..dd79475 100644
--- a/test/runtests.jl
+++ b/test/runtests.jl
@@ -527,6 +527,12 @@ t3 = Arrow.Table(Arrow.tobuffer(t2))
     @test table.b == collect(b)
 end
 
+# Empty input
+@test Arrow.Table(UInt8[]) isa Arrow.Table
+@test isempty(Tables.rows(Arrow.Table(UInt8[])))
+@test Arrow.Stream(UInt8[]) isa Arrow.Stream
+@test isempty(Tables.partitions(Arrow.Stream(UInt8[])))
+
 end # @testset "misc"
 
 end

Reply via email to