This is an automated email from the ASF dual-hosted git repository.

quinnj pushed a commit to branch jq-528
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git

commit fcd603ac144d4a732b075be9cedaa3be9ab9dc38
Author: Jacob Quinn <[email protected]>
AuthorDate: Fri Oct 24 14:29:08 2025 -0600

    Fix poor performance of table reading when many record batches are involved
---
 src/table.jl | 66 +++++++++++++++++++++++++++++++++---------------------------
 1 file changed, 36 insertions(+), 30 deletions(-)

diff --git a/src/table.jl b/src/table.jl
index fe9206b..327afc6 100644
--- a/src/table.jl
+++ b/src/table.jl
@@ -470,28 +470,14 @@ function Table(blobs::Vector{ArrowBlob}; 
convert::Bool=true)
     sch = nothing
     dictencodingslockable = Lockable(Dict{Int64,DictEncoding}()) # dictionary 
id => DictEncoding
     dictencoded = Dict{Int64,Meta.Field}() # dictionary id => field
-    sync = OrderedSynchronizer()
-    tsks = Channel{Any}(Inf)
-    tsk = @wkspawn begin
-        i = 1
-        for cols in tsks
-            if i == 1
-                foreach(x -> push!(columns(t), x), cols)
-            elseif i == 2
-                foreach(1:length(cols)) do i
-                    columns(t)[i] = ChainedVector([columns(t)[i], cols[i]])
-                end
-            else
-                foreach(1:length(cols)) do i
-                    append!(columns(t)[i], cols[i])
-                end
-            end
-            i += 1
-        end
-    end
-    anyrecordbatches = false
+    # we'll grow/add a record batch set of columns as they're constructed
+    # must be holding the lock while growing/adding
+    # starts at 0-length because we don't know how many record batches there 
will be
+    rb_cols = []
+    rb_cols_lock = ReentrantLock()
     rbi = 1
-    @sync for blob in blobs
+    tasks = Task[]
+    for blob in blobs
         for batch in BatchIterator(blob)
             # store custom_metadata of batch.msg?
             header = batch.msg.header
@@ -578,30 +564,38 @@ function Table(blobs::Vector{ArrowBlob}; 
convert::Bool=true)
                 end # lock
                 @debug "parsed dictionary batch message: id=$id, 
data=$values\n"
             elseif header isa Meta.RecordBatch
-                anyrecordbatches = true
                 @debug "parsing record batch message: compression = 
$(header.compression)"
-                @wkspawn begin
-                    cols =
-                        collect(VectorIterator(sch, $batch, 
dictencodingslockable, convert))
-                    put!(() -> put!(tsks, cols), sync, $(rbi))
-                end
+                push!(tasks, collect_cols!(rbi, rb_cols_lock, rb_cols, sch, 
batch, dictencodingslockable, convert))
                 rbi += 1
             else
                 throw(ArgumentError("unsupported arrow message type: 
$(typeof(header))"))
             end
         end
     end
-    close(tsks)
-    wait(tsk)
+    waitall(tasks)
     lu = lookup(t)
     ty = types(t)
     # 158; some implementations may send 0 record batches
-    if !anyrecordbatches && !isnothing(sch)
+    # no more multithreading, so no need to take the lock now
+    if length(rb_cols) == 0 && !isnothing(sch)
         for field in sch.fields
             T = juliaeltype(field, buildmetadata(field), convert)
             push!(columns(t), T[])
         end
     end
+    if length(rb_cols) > 0
+        foreach(x -> push!(columns(t), x), rb_cols[1])
+    end
+    if length(rb_cols) > 1
+        foreach(enumerate(rb_cols[2])) do (i, x)
+            columns(t)[i] = ChainedVector([columns(t)[i], x])
+        end
+        foreach(3:length(rb_cols)) do j
+            foreach(enumerate(rb_cols[j])) do (i, x)
+                append!(columns(t)[i], x)
+            end
+        end
+    end
     for (nm, col) in zip(names(t), columns(t))
         lu[nm] = col
         push!(ty, eltype(col))
@@ -610,6 +604,18 @@ function Table(blobs::Vector{ArrowBlob}; 
convert::Bool=true)
     return t
 end
 
+function collect_cols!(rbi, rb_cols_lock, rb_cols, sch, batch, 
dictencodingslockable, convert)
+    @wkspawn begin
+        cols = collect(VectorIterator(sch, batch, dictencodingslockable, 
convert))
+        @lock rb_cols_lock begin
+            if length(rb_cols) < rbi
+                resize!(rb_cols, rbi)
+            end
+            rb_cols[rbi] = cols
+        end
+    end
+end
+
 function getdictionaries!(dictencoded, field)
     d = field.dictionary
     if d !== nothing

Reply via email to