This is an automated email from the ASF dual-hosted git repository.

quinnj pushed a commit to branch jq/295
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git

commit b708cbe7b923b1a0ca9e31338d8369bc1733e349
Author: Jacob Quinn <[email protected]>
AuthorDate: Sat Mar 5 18:55:04 2022 -0700

    Alternative fix for #295.
    
    We already have a utility defined (`OrderedChannel`) that we use when
    writing record batches to ensure batches get _written_ in the same order
    they are provided; it makes sense to use the same utility when reading
    to ensure incoming record batches are _read_ in the appropriate order.
---
 src/table.jl | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/src/table.jl b/src/table.jl
index 88d65b1..8ca59e3 100644
--- a/src/table.jl
+++ b/src/table.jl
@@ -272,7 +272,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
     sch = nothing
     dictencodings = Dict{Int64, DictEncoding}() # dictionary id => DictEncoding
     dictencoded = Dict{Int64, Meta.Field}() # dictionary id => field
-    tsks = Channel{Task}(Inf)
+    tsks = OrderedChannel{Task}(Inf)
     tsk = Threads.@spawn begin
         i = 1
         for tsk in tsks
@@ -292,6 +292,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
         end
     end
     anyrecordbatches = false
+    batchindex = 1
     for blob in blobs
         bytes, pos, len = blob.bytes, blob.pos, blob.len
         if len > 24 &&
@@ -348,7 +349,8 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
                 @debug 1 "parsing record batch message: compression = 
$(header.compression)"
                 put!(tsks, Threads.@spawn begin
                     collect(VectorIterator(sch, batch, dictencodings, convert))
-                end)
+                end, batchindex)
+                batchindex += 1
             else
                 throw(ArgumentError("unsupported arrow message type: 
$(typeof(header))"))
             end

Reply via email to