This is an automated email from the ASF dual-hosted git repository. quinnj pushed a commit to branch jq/295 in repository https://gitbox.apache.org/repos/asf/arrow-julia.git
commit b708cbe7b923b1a0ca9e31338d8369bc1733e349 Author: Jacob Quinn <[email protected]> AuthorDate: Sat Mar 5 18:55:04 2022 -0700 Alternative fix for #295. We already have a utility defined (`OrderedChannel`) that we use when writing record batches to ensure batches get _written_ in the same order they are provided; it makes sense to use the same utility when reading to ensure incoming record batches are _read_ in the appropriate order. --- src/table.jl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/table.jl b/src/table.jl index 88d65b1..8ca59e3 100644 --- a/src/table.jl +++ b/src/table.jl @@ -272,7 +272,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) sch = nothing dictencodings = Dict{Int64, DictEncoding}() # dictionary id => DictEncoding dictencoded = Dict{Int64, Meta.Field}() # dictionary id => field - tsks = Channel{Task}(Inf) + tsks = OrderedChannel{Task}(Inf) tsk = Threads.@spawn begin i = 1 for tsk in tsks @@ -292,6 +292,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) end end anyrecordbatches = false + batchindex = 1 for blob in blobs bytes, pos, len = blob.bytes, blob.pos, blob.len if len > 24 && @@ -348,7 +349,8 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) @debug 1 "parsing record batch message: compression = $(header.compression)" put!(tsks, Threads.@spawn begin collect(VectorIterator(sch, batch, dictencodings, convert)) - end) + end, batchindex) + batchindex += 1 else throw(ArgumentError("unsupported arrow message type: $(typeof(header))")) end
