This is an automated email from the ASF dual-hosted git repository.

quinnj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git


The following commit(s) were added to refs/heads/main by this push:
     new 532b89b  allow ntasks to be 0 and determine whetheher to use threads 
based on nthreads rather than ntasks (#325)
532b89b is described below

commit 532b89b2c5740124cadca632a14ebb6cc9a0dca5
Author: Ben Baumgold <[email protected]>
AuthorDate: Fri Jun 10 03:11:07 2022 -0400

    allow ntasks to be 0 and determine whetheher to use threads based on 
nthreads rather than ntasks (#325)
    
    Co-authored-by: Ben Baumgold <[email protected]>
---
 src/write.jl | 7 ++-----
 1 file changed, 2 insertions(+), 5 deletions(-)

diff --git a/src/write.jl b/src/write.jl
index 5439b95..ae2da6d 100644
--- a/src/write.jl
+++ b/src/write.jl
@@ -46,7 +46,7 @@ Supported keyword arguments to `Arrow.write` include:
   * `largelists::Bool=false`: causes list column types to be written with 
Int64 offset arrays; mainly for testing purposes; by default, Int64 offsets 
will be used only if needed
   * `maxdepth::Int=$DEFAULT_MAX_DEPTH`: deepest allowed nested serialization 
level; this is provided by default to prevent accidental infinite recursion 
with mutually recursive data structures
   * `metadata=Arrow.getmetadata(tbl)`: the metadata that should be written as 
the table's schema's `custom_metadata` field; must either be `nothing` or an 
iterable of `<:AbstractString` pairs.
-  * `ntasks::Int`: number of concurrent threaded tasks to allow while writing 
input partitions out as arrow record batches; default is no limit; to disable 
multithreaded writing, pass `ntasks=1`
+  * `ntasks::Int`: number of buffered threaded tasks to allow while writing 
input partitions out as arrow record batches; default is no limit; for 
unbuffered writing, pass `ntasks=0`
   * `file::Bool=false`: if a an `io` argument is being written to, passing 
`file=true` will cause the arrow file format to be written instead of just IPC 
streaming
 """
 function write end
@@ -135,16 +135,13 @@ mutable struct Writer{T<:IO}
 end
 
 function Base.open(::Type{Writer}, io::T, 
compress::Union{Nothing,LZ4FrameCompressor,<:AbstractVector{LZ4FrameCompressor},ZstdCompressor,<:AbstractVector{ZstdCompressor}},
 writetofile::Bool, largelists::Bool, denseunions::Bool, dictencode::Bool, 
dictencodenested::Bool, alignment::Integer, maxdepth::Integer, ntasks::Integer, 
meta::Union{Nothing,Any}, colmeta::Union{Nothing,Any}, closeio::Bool) where 
{T<:IO}
-    if ntasks < 1
-        throw(ArgumentError("ntasks keyword argument must be > 0; pass 
`ntasks=1` to disable multithreaded writing"))
-    end
     msgs = OrderedChannel{Message}(ntasks)
     schema = Ref{Tables.Schema}()
     firstcols = Ref{Any}()
     dictencodings = Dict{Int64,Any}() # Lockable{DictEncoding}
     blocks = (Block[], Block[])
     # start message writing from channel
-    threaded = ntasks > 1
+    threaded = Threads.nthreads() > 1
     task = threaded ? (Threads.@spawn for msg in msgs
         Base.write(io, msg, blocks, schema, alignment)
     end) : (@async for msg in msgs

Reply via email to