baumgold commented on code in PR #397:
URL: https://github.com/apache/arrow-julia/pull/397#discussion_r1134720104


##########
src/arraytypes/arraytypes.jl:
##########
@@ -35,14 +35,19 @@ function toarrowvector(x, i=1, de=Dict{Int64, Any}(), 
ded=DictEncoding[], meta=g
     @debugv 2 "converting top-level column to arrow format: col = 
$(typeof(x)), compression = $compression, kw = $(kw.data)"

Review Comment:
   This function should probably be broken down into a few smaller functions 
for clarity:
   
   ```julia
   
   toarrowvector(a::ArrowVector, compression::Nothing, kw...) = a
   
   function toarrowvector(a::ArrowVector, 
compression::AbstractVector{LZ4FrameCompressor}, kw...)
       tid=Threads.threadid()
       lock(LZ4_FRAME_COMPRESSOR_LOCK[tid]) do
           compress(Meta.CompressionTypes.LZ4_FRAME, compression[tid], a)
       end
   end
   
   function toarrowvector(a::ArrowVector, compression::LZ4FrameCompressor, 
Vector{ZstdCompressor}, ZstdCompressor}=nothing, kw...)
       compress(Meta.CompressionTypes.LZ4_FRAME, compression, a)
   end
   
   function toarrowvector(a::ArrowVector, 
compression::AbstractVector{ZstdCompressor}, kw...)
       compress(Meta.CompressionTypes.ZSTD, compression, a)
   end
   
   function toarrowvector(a::ArrowVector, compression::ZstdCompressor, kw...)
       tid=Threads.threadid()
       lock(ZSTD_COMPRESSOR_LOCK[tid]) do
           compress(Meta.CompressionTypes.ZSTD, compression[tid], a)
       end
   end
   
   function toarrowvector(x, i=1, de=Dict{Int64, Any}(), ded=DictEncoding[], 
meta=getmetadata(x); compression::Union{Nothing, Vector{LZ4FrameCompressor}, 
LZ4FrameCompressor, Vector{ZstdCompressor}, ZstdCompressor}=nothing, kw...)
       @debugv 2 "converting top-level column to arrow format: col = 
$(typeof(x)), compression = $compression, kw = $(kw.data)"
       @debugv 3 x
       A = arrowvector(x, i, 0, 0, de, ded, meta; compression=compression, 
kw...)
       A = toarrowvector(A, compression)
       @debugv 2 "converted top-level column to arrow format: $(typeof(A))"
       @debugv 3 A
       return A
   end
   
   
   ```



##########
src/Arrow.jl:
##########
@@ -73,6 +73,13 @@ include("show.jl")
 
 const LZ4_FRAME_COMPRESSOR = LZ4FrameCompressor[]
 const ZSTD_COMPRESSOR = ZstdCompressor[]
+const LZ4_FRAME_DECOMPRESSOR = LZ4FrameDecompressor[]
+const ZSTD_DECOMPRESSOR = ZstdDecompressor[]
+# add locks for multithreaded (de/)compression (because we index by threadid 
which might not be safe under Julia >1.8)
+const LZ4_FRAME_COMPRESSOR_LOCK = ReentrantLock[]

Review Comment:
   A single thread can only do one thing at a time so maybe we can have a 
single lock per thread to be shared amongst all compressors/decompressors?



##########
src/Arrow.jl:
##########
@@ -82,6 +89,19 @@ function __init__()
         lz4 = LZ4FrameCompressor(; compressionlevel=4)
         CodecLz4.TranscodingStreams.initialize(lz4)
         push!(LZ4_FRAME_COMPRESSOR, lz4)
+        # Locks
+        push!(LZ4_FRAME_COMPRESSOR_LOCK, ReentrantLock())
+        push!(ZSTD_COMPRESSOR_LOCK, ReentrantLock())
+        # Decompressors
+        zstdd = ZstdDecompressor()
+        CodecZstd.TranscodingStreams.initialize(zstdd)

Review Comment:
   Maybe we should be a bit lazier about initializing these as I believe 
they're quite expensive.  Shall we only initialize them only as needed?  For 
example, if a user is only decompressing (reading) data then there's no reason 
to pay the penalty of initializing compressors (and vice versa).  Same idea for 
LZ4 vs ZSTD - if a user only uses one then no need to initialize both.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to