This is an automated email from the ASF dual-hosted git repository.

quinnj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git


The following commit(s) were added to refs/heads/main by this push:
     new c011888  Fix poor performance of table reading when many record 
batches are involved (#570)
c011888 is described below

commit c011888f676b41805f6833c3662fbbef02b4cbe3
Author: Jacob Quinn <[email protected]>
AuthorDate: Fri Oct 24 16:10:15 2025 -0600

    Fix poor performance of table reading when many record batches are involved 
(#570)
    
    Fixes #528. Alternative to #568
    
    cc: @KristofferC
---
 src/table.jl | 85 +++++++++++++++++++++++++++++++++++++++---------------------
 src/utils.jl |  6 +++++
 2 files changed, 61 insertions(+), 30 deletions(-)

diff --git a/src/table.jl b/src/table.jl
index fe9206b..de8bfc3 100644
--- a/src/table.jl
+++ b/src/table.jl
@@ -470,28 +470,14 @@ function Table(blobs::Vector{ArrowBlob}; 
convert::Bool=true)
     sch = nothing
     dictencodingslockable = Lockable(Dict{Int64,DictEncoding}()) # dictionary 
id => DictEncoding
     dictencoded = Dict{Int64,Meta.Field}() # dictionary id => field
-    sync = OrderedSynchronizer()
-    tsks = Channel{Any}(Inf)
-    tsk = @wkspawn begin
-        i = 1
-        for cols in tsks
-            if i == 1
-                foreach(x -> push!(columns(t), x), cols)
-            elseif i == 2
-                foreach(1:length(cols)) do i
-                    columns(t)[i] = ChainedVector([columns(t)[i], cols[i]])
-                end
-            else
-                foreach(1:length(cols)) do i
-                    append!(columns(t)[i], cols[i])
-                end
-            end
-            i += 1
-        end
-    end
-    anyrecordbatches = false
+    # we'll grow/add a record batch set of columns as they're constructed
+    # must be holding the lock while growing/adding
+    # starts at 0-length because we don't know how many record batches there 
will be
+    rb_cols = []
+    rb_cols_lock = ReentrantLock()
     rbi = 1
-    @sync for blob in blobs
+    tasks = Task[]
+    for blob in blobs
         for batch in BatchIterator(blob)
             # store custom_metadata of batch.msg?
             header = batch.msg.header
@@ -578,30 +564,49 @@ function Table(blobs::Vector{ArrowBlob}; 
convert::Bool=true)
                 end # lock
                 @debug "parsed dictionary batch message: id=$id, 
data=$values\n"
             elseif header isa Meta.RecordBatch
-                anyrecordbatches = true
                 @debug "parsing record batch message: compression = 
$(header.compression)"
-                @wkspawn begin
-                    cols =
-                        collect(VectorIterator(sch, $batch, 
dictencodingslockable, convert))
-                    put!(() -> put!(tsks, cols), sync, $(rbi))
-                end
+                push!(
+                    tasks,
+                    collect_cols!(
+                        rbi,
+                        rb_cols_lock,
+                        rb_cols,
+                        sch,
+                        batch,
+                        dictencodingslockable,
+                        convert,
+                    ),
+                )
                 rbi += 1
             else
                 throw(ArgumentError("unsupported arrow message type: 
$(typeof(header))"))
             end
         end
     end
-    close(tsks)
-    wait(tsk)
+    _waitall(tasks)
     lu = lookup(t)
     ty = types(t)
     # 158; some implementations may send 0 record batches
-    if !anyrecordbatches && !isnothing(sch)
+    # no more multithreading, so no need to take the lock now
+    if length(rb_cols) == 0 && !isnothing(sch)
         for field in sch.fields
             T = juliaeltype(field, buildmetadata(field), convert)
             push!(columns(t), T[])
         end
     end
+    if length(rb_cols) > 0
+        foreach(x -> push!(columns(t), x), rb_cols[1])
+    end
+    if length(rb_cols) > 1
+        foreach(enumerate(rb_cols[2])) do (i, x)
+            columns(t)[i] = ChainedVector([columns(t)[i], x])
+        end
+        foreach(3:length(rb_cols)) do j
+            foreach(enumerate(rb_cols[j])) do (i, x)
+                append!(columns(t)[i], x)
+            end
+        end
+    end
     for (nm, col) in zip(names(t), columns(t))
         lu[nm] = col
         push!(ty, eltype(col))
@@ -610,6 +615,26 @@ function Table(blobs::Vector{ArrowBlob}; 
convert::Bool=true)
     return t
 end
 
+function collect_cols!(
+    rbi,
+    rb_cols_lock,
+    rb_cols,
+    sch,
+    batch,
+    dictencodingslockable,
+    convert,
+)
+    @wkspawn begin
+        cols = collect(VectorIterator(sch, batch, dictencodingslockable, 
convert))
+        @lock rb_cols_lock begin
+            if length(rb_cols) < rbi
+                resize!(rb_cols, rbi)
+            end
+            rb_cols[rbi] = cols
+        end
+    end
+end
+
 function getdictionaries!(dictencoded, field)
     d = field.dictionary
     if d !== nothing
diff --git a/src/utils.jl b/src/utils.jl
index 419bd39..8e2dfee 100644
--- a/src/utils.jl
+++ b/src/utils.jl
@@ -28,6 +28,12 @@ function writezeros(io::IO, n::Integer)
     s
 end
 
+if isdefined(Base, :waitall)
+    const _waitall = waitall
+else
+    _waitall(tasks) = foreach(wait, tasks)
+end
+
 # efficient writing of arrays
 writearray(io, col) = writearray(io, maybemissing(eltype(col)), col)
 

Reply via email to