quinnj commented on a change in pull request #277:
URL: https://github.com/apache/arrow-julia/pull/277#discussion_r820173569



##########
File path: src/write.jl
##########
@@ -51,131 +51,213 @@ Supported keyword arguments to `Arrow.write` include:
 """
 function write end
 
-write(io_or_file; kw...) = x -> write(io_or_file, x; kw...)
+struct Message
+    msgflatbuf
+    columns
+    bodylen
+    isrecordbatch::Bool
+    blockmsg::Bool
+    headerType
+end
 
-function write(file_path, tbl; metadata=getmetadata(tbl), colmetadata=nothing, 
largelists::Bool=false, compress::Union{Nothing, Symbol, LZ4FrameCompressor, 
ZstdCompressor}=nothing, denseunions::Bool=true, dictencode::Bool=false, 
dictencodenested::Bool=false, alignment::Int=8, 
maxdepth::Int=DEFAULT_MAX_DEPTH, ntasks=Inf, file::Bool=true)
-    open(file_path, "w") do io
-        write(io, tbl, file, largelists, compress, denseunions, dictencode, 
dictencodenested, alignment, maxdepth, ntasks, metadata, colmetadata)
-    end
-    return file_path
+struct Block
+    offset::Int64
+    metaDataLength::Int32
+    bodyLength::Int64
 end
 
-function write(io::IO, tbl; metadata=getmetadata(tbl), colmetadata=nothing, 
largelists::Bool=false, compress::Union{Nothing, Symbol, LZ4FrameCompressor, 
ZstdCompressor}=nothing, denseunions::Bool=true, dictencode::Bool=false, 
dictencodenested::Bool=false, alignment::Int=8, 
maxdepth::Int=DEFAULT_MAX_DEPTH, ntasks=Inf, file::Bool=false)
-    return write(io, tbl, file, largelists, compress, denseunions, dictencode, 
dictencodenested, alignment, maxdepth, ntasks, metadata, colmetadata)
+mutable struct Writer{T<:IO}
+    io::T
+    closeio::Bool
+    
compress::Union{Nothing,LZ4FrameCompressor,Vector{LZ4FrameCompressor},ZstdCompressor,Vector{ZstdCompressor}}
+    writetofile::Bool
+    largelists::Bool
+    denseunions::Bool
+    dictencode::Bool
+    dictencodenested::Bool
+    threaded::Bool
+    alignment::Int32
+    maxdepth::Int64
+    meta::Union{Nothing,Base.ImmutableDict{String,String}}
+    
colmeta::Union{Nothing,Base.ImmutableDict{Symbol,Base.ImmutableDict{String,String}}}
+    msgs::OrderedChannel{Message}
+    schema::Ref{Tables.Schema}
+    firstcols::Ref{Any}
+    dictencodings::Dict{Int64, Any}
+    blocks::NTuple{2, Vector{Block}}
+    task::Task
+    anyerror::Threads.Atomic{Bool}
+    errorref::Ref{Any}
+    partition_count::Int32
+    isclosed::Bool
 end
 
-function write(io, source, writetofile, largelists, compress, denseunions, 
dictencode, dictencodenested, alignment, maxdepth, ntasks, meta, colmeta)
+function Base.open(Writer, io::T, 
compress::Union{Nothing,LZ4FrameCompressor,<:AbstractVector{LZ4FrameCompressor},ZstdCompressor,<:AbstractVector{ZstdCompressor}},
 writetofile::Bool, largelists::Bool, denseunions::Bool, dictencode::Bool, 
dictencodenested::Bool, alignment::Integer, maxdepth::Integer, ntasks::Integer, 
meta::Union{Nothing,Any}, colmeta::Union{Nothing,Any}, closeio::Bool) where 
{T<:IO}
     if ntasks < 1
         throw(ArgumentError("ntasks keyword argument must be > 0; pass 
`ntasks=1` to disable multithreaded writing"))
     end
-    if compress === :lz4
-        compress = LZ4_FRAME_COMPRESSOR
-    elseif compress === :zstd
-        compress = ZSTD_COMPRESSOR
-    elseif compress isa Symbol
-        throw(ArgumentError("unsupported compress keyword argument value: 
$compress. Valid values include `:lz4` or `:zstd`"))
-    end
-    # TODO: we're probably not threadsafe if user passes own single compressor 
instance + ntasks > 1
-    # if ntasks > 1 && compres !== nothing && !(compress isa Vector)
-    #     compress = Threads.resize_nthreads!([compress])
-    # end
-    if writetofile
-        @debug 1 "starting write of arrow formatted file"
-        Base.write(io, "ARROW1\0\0")
-    end
     msgs = OrderedChannel{Message}(ntasks)
-    # build messages
-    sch = Ref{Tables.Schema}()
+    schema = Ref{Tables.Schema}()
     firstcols = Ref{Any}()
     dictencodings = Dict{Int64, Any}() # Lockable{DictEncoding}
     blocks = (Block[], Block[])
     # start message writing from channel
     threaded = ntasks > 1
-    tsk = threaded ? (Threads.@spawn for msg in msgs
-        Base.write(io, msg, blocks, sch, alignment)
+    task = threaded ? (Threads.@spawn for msg in msgs
+        Base.write(io, msg, blocks, schema, alignment)
     end) : (@async for msg in msgs
-        Base.write(io, msg, blocks, sch, alignment)
+        Base.write(io, msg, blocks, schema, alignment)
     end)
     anyerror = Threads.Atomic{Bool}(false)
     errorref = Ref{Any}()
-    @sync for (i, tbl) in enumerate(Tables.partitions(source))
-        if anyerror[]
-            @error "error writing arrow data on partition = $(errorref[][3])" 
exception=(errorref[][1], errorref[][2])
-            error("fatal error writing arrow data")
-        end
-        @debug 1 "processing table partition i = $i"
+    meta = _normalizemeta(meta)
+    colmeta = _normalizecolmeta(colmeta)
+    Writer{T}(io, closeio, compress, writetofile, largelists, denseunions, 
dictencode, dictencodenested, threaded, alignment, maxdepth, meta, colmeta, 
msgs, schema, firstcols, dictencodings, blocks, task, anyerror, errorref, 1, 
false)
+end
+
+function Base.open(Writer, io::IO, compress::Symbol, args...)

Review comment:
       `Writer` in this case is just a variable name; did you instead mean 
`::Type{Writer}`? Otherwise, this method will "pirate" a method in Base I 
imagine since none of the arguments are specific to this package.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to