This is an automated email from the ASF dual-hosted git repository. quinnj pushed a commit to branch jq-wkspawn in repository https://gitbox.apache.org/repos/asf/arrow-julia.git
commit 61bee0fcd50fbe2ba5b097735dfd72aff36f76cb Author: Jacob Quinn <[email protected]> AuthorDate: Tue Jun 13 17:05:55 2023 -0600 Use wkspawn from ConcurrentUtilities instead of Threads.spawn Should fix #336. For more context, see the [same fix](https://github.com/JuliaData/CSV.jl/commit/077e177e359c0b58e2d6db0b660a235a2ee54228) we made for this in CSV.jl. Basically, objects interpolated into or returned from spawned tasks can be unexpectedly kept alive long after the task has finished and the object should have been garbage-collected due to individual threads holding the most recent task as a reference. Using `@wkspawn` ensures tasks themselves don't hold references to any of these once they're done executing. --- src/append.jl | 4 ++-- src/table.jl | 4 ++-- src/write.jl | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/append.jl b/src/append.jl index db7f1d3..a3c620f 100644 --- a/src/append.jl +++ b/src/append.jl @@ -130,7 +130,7 @@ function append(io::IO, source, arrow_schema, compress, largelists, denseunions, blocks = (Block[], Block[]) # start message writing from channel threaded = ntasks > 1 - tsk = threaded ? (Threads.@spawn for msg in msgs + tsk = threaded ? (@wkspawn for msg in msgs Base.write(io, msg, blocks, sch, alignment) end) : (@async for msg in msgs Base.write(io, msg, blocks, sch, alignment) @@ -151,7 +151,7 @@ function append(io::IO, source, arrow_schema, compress, largelists, denseunions, end if threaded - Threads.@spawn process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, sync, msgs, alignment, i, sch, errorref, anyerror, meta, colmeta) + @wkspawn process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, sync, msgs, alignment, i, sch, errorref, anyerror, meta, colmeta) else @async process_partition(tbl_cols, dictencodings, largelists, compress, denseunions, dictencode, dictencodenested, maxdepth, sync, msgs, alignment, i, sch, errorref, anyerror, meta, colmeta) end diff --git a/src/table.jl b/src/table.jl index e61445c..daa6566 100644 --- a/src/table.jl +++ b/src/table.jl @@ -358,7 +358,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) dictencoded = Dict{Int64, Meta.Field}() # dictionary id => field sync = OrderedSynchronizer() tsks = Channel{Any}(Inf) - tsk = Threads.@spawn begin + tsk = @wkspawn begin i = 1 for cols in tsks if i == 1 @@ -425,7 +425,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) elseif header isa Meta.RecordBatch anyrecordbatches = true @debugv 1 "parsing record batch message: compression = $(header.compression)" - Threads.@spawn begin + @wkspawn begin cols = collect(VectorIterator(sch, $batch, dictencodings, convert)) put!(() -> put!(tsks, cols), sync, $(rbi)) end diff --git a/src/write.jl b/src/write.jl index a6bd40b..6f14e5e 100644 --- a/src/write.jl +++ b/src/write.jl @@ -147,7 +147,7 @@ function Base.open(::Type{Writer}, io::T, compress::Union{Nothing,Symbol,LZ4Fram blocks = (Block[], Block[]) # start message writing from channel threaded = Threads.nthreads() > 1 - task = threaded ? (Threads.@spawn for msg in msgs + task = threaded ? (@wkspawn for msg in msgs Base.write(io, msg, blocks, schema, alignment) end) : (@async for msg in msgs Base.write(io, msg, blocks, schema, alignment) @@ -202,7 +202,7 @@ function write(writer::Writer, source) put!(writer.msgs, recbatchmsg) else if writer.threaded - Threads.@spawn process_partition(tblcols, writer.dictencodings, writer.largelists, writer.compress, writer.denseunions, writer.dictencode, writer.dictencodenested, writer.maxdepth, writer.sync, writer.msgs, writer.alignment, $(writer.partition_count), writer.schema, writer.errorref, writer.anyerror, writer.meta, writer.colmeta) + @wkspawn process_partition(tblcols, writer.dictencodings, writer.largelists, writer.compress, writer.denseunions, writer.dictencode, writer.dictencodenested, writer.maxdepth, writer.sync, writer.msgs, writer.alignment, $(writer.partition_count), writer.schema, writer.errorref, writer.anyerror, writer.meta, writer.colmeta) else @async process_partition(tblcols, writer.dictencodings, writer.largelists, writer.compress, writer.denseunions, writer.dictencode, writer.dictencodenested, writer.maxdepth, writer.sync, writer.msgs, writer.alignment, $(writer.partition_count), writer.schema, writer.errorref, writer.anyerror, writer.meta, writer.colmeta) end
