quinnj commented on a change in pull request #277:
URL: https://github.com/apache/arrow-julia/pull/277#discussion_r820173490
##########
File path: src/write.jl
##########
@@ -51,131 +51,213 @@ Supported keyword arguments to `Arrow.write` include:
"""
function write end
-write(io_or_file; kw...) = x -> write(io_or_file, x; kw...)
+struct Message
+ msgflatbuf
+ columns
+ bodylen
+ isrecordbatch::Bool
+ blockmsg::Bool
+ headerType
+end
-function write(file_path, tbl; metadata=getmetadata(tbl), colmetadata=nothing,
largelists::Bool=false, compress::Union{Nothing, Symbol, LZ4FrameCompressor,
ZstdCompressor}=nothing, denseunions::Bool=true, dictencode::Bool=false,
dictencodenested::Bool=false, alignment::Int=8,
maxdepth::Int=DEFAULT_MAX_DEPTH, ntasks=Inf, file::Bool=true)
- open(file_path, "w") do io
- write(io, tbl, file, largelists, compress, denseunions, dictencode,
dictencodenested, alignment, maxdepth, ntasks, metadata, colmetadata)
- end
- return file_path
+struct Block
+ offset::Int64
+ metaDataLength::Int32
+ bodyLength::Int64
end
-function write(io::IO, tbl; metadata=getmetadata(tbl), colmetadata=nothing,
largelists::Bool=false, compress::Union{Nothing, Symbol, LZ4FrameCompressor,
ZstdCompressor}=nothing, denseunions::Bool=true, dictencode::Bool=false,
dictencodenested::Bool=false, alignment::Int=8,
maxdepth::Int=DEFAULT_MAX_DEPTH, ntasks=Inf, file::Bool=false)
- return write(io, tbl, file, largelists, compress, denseunions, dictencode,
dictencodenested, alignment, maxdepth, ntasks, metadata, colmetadata)
+mutable struct Writer{T<:IO}
+ io::T
+ closeio::Bool
+
compress::Union{Nothing,LZ4FrameCompressor,Vector{LZ4FrameCompressor},ZstdCompressor,Vector{ZstdCompressor}}
+ writetofile::Bool
+ largelists::Bool
+ denseunions::Bool
+ dictencode::Bool
+ dictencodenested::Bool
+ threaded::Bool
+ alignment::Int32
+ maxdepth::Int64
+ meta::Union{Nothing,Base.ImmutableDict{String,String}}
+
colmeta::Union{Nothing,Base.ImmutableDict{Symbol,Base.ImmutableDict{String,String}}}
+ msgs::OrderedChannel{Message}
+ schema::Ref{Tables.Schema}
+ firstcols::Ref{Any}
+ dictencodings::Dict{Int64, Any}
+ blocks::NTuple{2, Vector{Block}}
+ task::Task
+ anyerror::Threads.Atomic{Bool}
+ errorref::Ref{Any}
+ partition_count::Int32
+ isclosed::Bool
end
-function write(io, source, writetofile, largelists, compress, denseunions,
dictencode, dictencodenested, alignment, maxdepth, ntasks, meta, colmeta)
+function Base.open(Writer, io::T,
compress::Union{Nothing,LZ4FrameCompressor,<:AbstractVector{LZ4FrameCompressor},ZstdCompressor,<:AbstractVector{ZstdCompressor}},
writetofile::Bool, largelists::Bool, denseunions::Bool, dictencode::Bool,
dictencodenested::Bool, alignment::Integer, maxdepth::Integer, ntasks::Integer,
meta::Union{Nothing,Any}, colmeta::Union{Nothing,Any}, closeio::Bool) where
{T<:IO}
if ntasks < 1
throw(ArgumentError("ntasks keyword argument must be > 0; pass
`ntasks=1` to disable multithreaded writing"))
end
- if compress === :lz4
- compress = LZ4_FRAME_COMPRESSOR
- elseif compress === :zstd
- compress = ZSTD_COMPRESSOR
- elseif compress isa Symbol
- throw(ArgumentError("unsupported compress keyword argument value:
$compress. Valid values include `:lz4` or `:zstd`"))
- end
- # TODO: we're probably not threadsafe if user passes own single compressor
instance + ntasks > 1
- # if ntasks > 1 && compres !== nothing && !(compress isa Vector)
- # compress = Threads.resize_nthreads!([compress])
- # end
- if writetofile
- @debug 1 "starting write of arrow formatted file"
- Base.write(io, "ARROW1\0\0")
- end
msgs = OrderedChannel{Message}(ntasks)
- # build messages
- sch = Ref{Tables.Schema}()
+ schema = Ref{Tables.Schema}()
firstcols = Ref{Any}()
dictencodings = Dict{Int64, Any}() # Lockable{DictEncoding}
blocks = (Block[], Block[])
# start message writing from channel
threaded = ntasks > 1
- tsk = threaded ? (Threads.@spawn for msg in msgs
- Base.write(io, msg, blocks, sch, alignment)
+ task = threaded ? (Threads.@spawn for msg in msgs
+ Base.write(io, msg, blocks, schema, alignment)
end) : (@async for msg in msgs
- Base.write(io, msg, blocks, sch, alignment)
+ Base.write(io, msg, blocks, schema, alignment)
end)
anyerror = Threads.Atomic{Bool}(false)
errorref = Ref{Any}()
- @sync for (i, tbl) in enumerate(Tables.partitions(source))
- if anyerror[]
- @error "error writing arrow data on partition = $(errorref[][3])"
exception=(errorref[][1], errorref[][2])
- error("fatal error writing arrow data")
- end
- @debug 1 "processing table partition i = $i"
+ meta = _normalizemeta(meta)
+ colmeta = _normalizecolmeta(colmeta)
+ Writer{T}(io, closeio, compress, writetofile, largelists, denseunions,
dictencode, dictencodenested, threaded, alignment, maxdepth, meta, colmeta,
msgs, schema, firstcols, dictencodings, blocks, task, anyerror, errorref, 1,
false)
Review comment:
```suggestion
return Writer{T}(io, closeio, compress, writetofile, largelists,
denseunions, dictencode, dictencodenested, threaded, alignment, maxdepth, meta,
colmeta, msgs, schema, firstcols, dictencodings, blocks, task, anyerror,
errorref, 1, false)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]