This is an automated email from the ASF dual-hosted git repository.
quinnj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git
The following commit(s) were added to refs/heads/main by this push:
new c011888 Fix poor performance of table reading when many record
batches are involved (#570)
c011888 is described below
commit c011888f676b41805f6833c3662fbbef02b4cbe3
Author: Jacob Quinn <[email protected]>
AuthorDate: Fri Oct 24 16:10:15 2025 -0600
Fix poor performance of table reading when many record batches are involved
(#570)
Fixes #528. Alternative to #568
cc: @KristofferC
---
src/table.jl | 85 +++++++++++++++++++++++++++++++++++++++---------------------
src/utils.jl | 6 +++++
2 files changed, 61 insertions(+), 30 deletions(-)
diff --git a/src/table.jl b/src/table.jl
index fe9206b..de8bfc3 100644
--- a/src/table.jl
+++ b/src/table.jl
@@ -470,28 +470,14 @@ function Table(blobs::Vector{ArrowBlob};
convert::Bool=true)
sch = nothing
dictencodingslockable = Lockable(Dict{Int64,DictEncoding}()) # dictionary
id => DictEncoding
dictencoded = Dict{Int64,Meta.Field}() # dictionary id => field
- sync = OrderedSynchronizer()
- tsks = Channel{Any}(Inf)
- tsk = @wkspawn begin
- i = 1
- for cols in tsks
- if i == 1
- foreach(x -> push!(columns(t), x), cols)
- elseif i == 2
- foreach(1:length(cols)) do i
- columns(t)[i] = ChainedVector([columns(t)[i], cols[i]])
- end
- else
- foreach(1:length(cols)) do i
- append!(columns(t)[i], cols[i])
- end
- end
- i += 1
- end
- end
- anyrecordbatches = false
+ # we'll grow/add a record batch set of columns as they're constructed
+ # must be holding the lock while growing/adding
+ # starts at 0-length because we don't know how many record batches there
will be
+ rb_cols = []
+ rb_cols_lock = ReentrantLock()
rbi = 1
- @sync for blob in blobs
+ tasks = Task[]
+ for blob in blobs
for batch in BatchIterator(blob)
# store custom_metadata of batch.msg?
header = batch.msg.header
@@ -578,30 +564,49 @@ function Table(blobs::Vector{ArrowBlob};
convert::Bool=true)
end # lock
@debug "parsed dictionary batch message: id=$id,
data=$values\n"
elseif header isa Meta.RecordBatch
- anyrecordbatches = true
@debug "parsing record batch message: compression =
$(header.compression)"
- @wkspawn begin
- cols =
- collect(VectorIterator(sch, $batch,
dictencodingslockable, convert))
- put!(() -> put!(tsks, cols), sync, $(rbi))
- end
+ push!(
+ tasks,
+ collect_cols!(
+ rbi,
+ rb_cols_lock,
+ rb_cols,
+ sch,
+ batch,
+ dictencodingslockable,
+ convert,
+ ),
+ )
rbi += 1
else
throw(ArgumentError("unsupported arrow message type:
$(typeof(header))"))
end
end
end
- close(tsks)
- wait(tsk)
+ _waitall(tasks)
lu = lookup(t)
ty = types(t)
# 158; some implementations may send 0 record batches
- if !anyrecordbatches && !isnothing(sch)
+ # no more multithreading, so no need to take the lock now
+ if length(rb_cols) == 0 && !isnothing(sch)
for field in sch.fields
T = juliaeltype(field, buildmetadata(field), convert)
push!(columns(t), T[])
end
end
+ if length(rb_cols) > 0
+ foreach(x -> push!(columns(t), x), rb_cols[1])
+ end
+ if length(rb_cols) > 1
+ foreach(enumerate(rb_cols[2])) do (i, x)
+ columns(t)[i] = ChainedVector([columns(t)[i], x])
+ end
+ foreach(3:length(rb_cols)) do j
+ foreach(enumerate(rb_cols[j])) do (i, x)
+ append!(columns(t)[i], x)
+ end
+ end
+ end
for (nm, col) in zip(names(t), columns(t))
lu[nm] = col
push!(ty, eltype(col))
@@ -610,6 +615,26 @@ function Table(blobs::Vector{ArrowBlob};
convert::Bool=true)
return t
end
+function collect_cols!(
+ rbi,
+ rb_cols_lock,
+ rb_cols,
+ sch,
+ batch,
+ dictencodingslockable,
+ convert,
+)
+ @wkspawn begin
+ cols = collect(VectorIterator(sch, batch, dictencodingslockable,
convert))
+ @lock rb_cols_lock begin
+ if length(rb_cols) < rbi
+ resize!(rb_cols, rbi)
+ end
+ rb_cols[rbi] = cols
+ end
+ end
+end
+
function getdictionaries!(dictencoded, field)
d = field.dictionary
if d !== nothing
diff --git a/src/utils.jl b/src/utils.jl
index 419bd39..8e2dfee 100644
--- a/src/utils.jl
+++ b/src/utils.jl
@@ -28,6 +28,12 @@ function writezeros(io::IO, n::Integer)
s
end
+if isdefined(Base, :waitall)
+ const _waitall = waitall
+else
+ _waitall(tasks) = foreach(wait, tasks)
+end
+
# efficient writing of arrays
writearray(io, col) = writearray(io, maybemissing(eltype(col)), col)