This is an automated email from the ASF dual-hosted git repository.
quinnj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git
The following commit(s) were added to refs/heads/main by this push:
new 532b89b allow ntasks to be 0 and determine whetheher to use threads
based on nthreads rather than ntasks (#325)
532b89b is described below
commit 532b89b2c5740124cadca632a14ebb6cc9a0dca5
Author: Ben Baumgold <[email protected]>
AuthorDate: Fri Jun 10 03:11:07 2022 -0400
allow ntasks to be 0 and determine whetheher to use threads based on
nthreads rather than ntasks (#325)
Co-authored-by: Ben Baumgold <[email protected]>
---
src/write.jl | 7 ++-----
1 file changed, 2 insertions(+), 5 deletions(-)
diff --git a/src/write.jl b/src/write.jl
index 5439b95..ae2da6d 100644
--- a/src/write.jl
+++ b/src/write.jl
@@ -46,7 +46,7 @@ Supported keyword arguments to `Arrow.write` include:
* `largelists::Bool=false`: causes list column types to be written with
Int64 offset arrays; mainly for testing purposes; by default, Int64 offsets
will be used only if needed
* `maxdepth::Int=$DEFAULT_MAX_DEPTH`: deepest allowed nested serialization
level; this is provided by default to prevent accidental infinite recursion
with mutually recursive data structures
* `metadata=Arrow.getmetadata(tbl)`: the metadata that should be written as
the table's schema's `custom_metadata` field; must either be `nothing` or an
iterable of `<:AbstractString` pairs.
- * `ntasks::Int`: number of concurrent threaded tasks to allow while writing
input partitions out as arrow record batches; default is no limit; to disable
multithreaded writing, pass `ntasks=1`
+ * `ntasks::Int`: number of buffered threaded tasks to allow while writing
input partitions out as arrow record batches; default is no limit; for
unbuffered writing, pass `ntasks=0`
* `file::Bool=false`: if a an `io` argument is being written to, passing
`file=true` will cause the arrow file format to be written instead of just IPC
streaming
"""
function write end
@@ -135,16 +135,13 @@ mutable struct Writer{T<:IO}
end
function Base.open(::Type{Writer}, io::T,
compress::Union{Nothing,LZ4FrameCompressor,<:AbstractVector{LZ4FrameCompressor},ZstdCompressor,<:AbstractVector{ZstdCompressor}},
writetofile::Bool, largelists::Bool, denseunions::Bool, dictencode::Bool,
dictencodenested::Bool, alignment::Integer, maxdepth::Integer, ntasks::Integer,
meta::Union{Nothing,Any}, colmeta::Union{Nothing,Any}, closeio::Bool) where
{T<:IO}
- if ntasks < 1
- throw(ArgumentError("ntasks keyword argument must be > 0; pass
`ntasks=1` to disable multithreaded writing"))
- end
msgs = OrderedChannel{Message}(ntasks)
schema = Ref{Tables.Schema}()
firstcols = Ref{Any}()
dictencodings = Dict{Int64,Any}() # Lockable{DictEncoding}
blocks = (Block[], Block[])
# start message writing from channel
- threaded = ntasks > 1
+ threaded = Threads.nthreads() > 1
task = threaded ? (Threads.@spawn for msg in msgs
Base.write(io, msg, blocks, schema, alignment)
end) : (@async for msg in msgs