This is an automated email from the ASF dual-hosted git repository.

quinnj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git


The following commit(s) were added to refs/heads/main by this push:
     new fc8b899  Make DictEncoding reading threadsafe (#535)
fc8b899 is described below

commit fc8b899356a4298a25a08a9f314be0a49bb2bbc5
Author: Jacob Quinn <[email protected]>
AuthorDate: Tue Nov 26 21:22:15 2024 -0700

    Make DictEncoding reading threadsafe (#535)
    
    Fixes #534
---
 .github/workflows/ci.yml |   3 +-
 Project.toml             |   2 +-
 src/table.jl             | 176 +++++++++++++++++++++++++----------------------
 3 files changed, 95 insertions(+), 86 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 5356fea..911910e 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -76,7 +76,8 @@ jobs:
           - name: ArrowTypes.jl
             dir: './src/ArrowTypes'
         version:
-          - '1.6'
+          - 'min'
+          - 'lts'
           - '1' # automatically expands to the latest stable 1.x release of 
Julia
           - 'nightly'
         os:
diff --git a/Project.toml b/Project.toml
index b7fb92a..ffc5729 100644
--- a/Project.toml
+++ b/Project.toml
@@ -49,4 +49,4 @@ SentinelArrays = "1"
 Tables = "1.1"
 TimeZones = "1"
 TranscodingStreams = "0.9.12, 0.10, 0.11"
-julia = "1.6"
+julia = "1.9"
diff --git a/src/table.jl b/src/table.jl
index 6dfb9b4..6990341 100644
--- a/src/table.jl
+++ b/src/table.jl
@@ -70,7 +70,7 @@ mutable struct Stream
     names::Vector{Symbol}
     types::Vector{Type}
     schema::Union{Nothing,Meta.Schema}
-    dictencodings::Dict{Int64,DictEncoding} # dictionary id => DictEncoding
+    dictencodings::Lockable{Dict{Int64,DictEncoding}} # dictionary id => 
DictEncoding
     dictencoded::Dict{Int64,Meta.Field} # dictionary id => field
     convert::Bool
     compression::Ref{Union{Symbol,Nothing}}
@@ -82,7 +82,7 @@ function Stream(inputs::Vector{ArrowBlob}; convert::Bool=true)
     names = Symbol[]
     types = Type[]
     schema = nothing
-    dictencodings = Dict{Int64,DictEncoding}()
+    dictencodings = Lockable(Dict{Int64,DictEncoding}())
     dictencoded = Dict{Int64,Meta.Field}()
     compression = Ref{Union{Symbol,Nothing}}(nothing)
     Stream(
@@ -210,8 +210,26 @@ function Base.iterate(x::Stream, (pos, id)=(1, 0))
             if recordbatch.compression !== nothing
                 compression = recordbatch.compression
             end
-            if haskey(x.dictencodings, id) && header.isDelta
-                # delta
+            @lock x.dictencodings begin
+                dictencodings = x.dictencodings[]
+                if haskey(dictencodings, id) && header.isDelta
+                    # delta
+                    field = x.dictencoded[id]
+                    values, _, _ = build(
+                        field,
+                        field.type,
+                        batch,
+                        recordbatch,
+                        x.dictencodings,
+                        Int64(1),
+                        Int64(1),
+                        x.convert,
+                    )
+                    dictencoding = dictencodings[id]
+                    append!(dictencoding.data, values)
+                    continue
+                end
+                # new dictencoding or replace
                 field = x.dictencoded[id]
                 values, _, _ = build(
                     field,
@@ -223,32 +241,17 @@ function Base.iterate(x::Stream, (pos, id)=(1, 0))
                     Int64(1),
                     x.convert,
                 )
-                dictencoding = x.dictencodings[id]
-                append!(dictencoding.data, values)
-                continue
-            end
-            # new dictencoding or replace
-            field = x.dictencoded[id]
-            values, _, _ = build(
-                field,
-                field.type,
-                batch,
-                recordbatch,
-                x.dictencodings,
-                Int64(1),
-                Int64(1),
-                x.convert,
-            )
-            A = ChainedVector([values])
-            S =
-                field.dictionary.indexType === nothing ? Int32 :
-                juliaeltype(field, field.dictionary.indexType, false)
-            x.dictencodings[id] = DictEncoding{eltype(A),S,typeof(A)}(
-                id,
-                A,
-                field.dictionary.isOrdered,
-                values.metadata,
-            )
+                A = ChainedVector([values])
+                S =
+                    field.dictionary.indexType === nothing ? Int32 :
+                    juliaeltype(field, field.dictionary.indexType, false)
+                dictencodings[id] = DictEncoding{eltype(A),S,typeof(A)}(
+                    id,
+                    A,
+                    field.dictionary.isOrdered,
+                    values.metadata,
+                )
+            end # lock
             @debug "parsed dictionary batch message: id=$id, data=$values\n"
         elseif header isa Meta.RecordBatch
             @debug "parsing record batch message: compression = 
$(header.compression)"
@@ -415,7 +418,7 @@ Table(inputs::Vector; kw...) =
 function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
     t = Table()
     sch = nothing
-    dictencodings = Dict{Int64,DictEncoding}() # dictionary id => DictEncoding
+    dictencodingslockable = Lockable(Dict{Int64,DictEncoding}()) # dictionary 
id => DictEncoding
     dictencoded = Dict{Int64,Meta.Field}() # dictionary id => field
     sync = OrderedSynchronizer()
     tsks = Channel{Any}(Inf)
@@ -465,65 +468,68 @@ function Table(blobs::Vector{ArrowBlob}; 
convert::Bool=true)
             elseif header isa Meta.DictionaryBatch
                 id = header.id
                 recordbatch = header.data
-                @debug "parsing dictionary batch message: id = $id, 
compression = $(recordbatch.compression)"
-                if haskey(dictencodings, id) && header.isDelta
-                    # delta
+                @info "parsing dictionary batch message: id = $id, compression 
= $(recordbatch.compression)"
+                @lock dictencodingslockable begin
+                    dictencodings = dictencodingslockable[]
+                    if haskey(dictencodings, id) && header.isDelta
+                        # delta
+                        field = dictencoded[id]
+                        values, _, _ = build(
+                            field,
+                            field.type,
+                            batch,
+                            recordbatch,
+                            dictencodingslockable,
+                            Int64(1),
+                            Int64(1),
+                            convert,
+                        )
+                        dictencoding = dictencodings[id]
+                        if typeof(dictencoding.data) <: ChainedVector
+                            append!(dictencoding.data, values)
+                        else
+                            A = ChainedVector([dictencoding.data, values])
+                            S =
+                                field.dictionary.indexType === nothing ? Int32 
:
+                                juliaeltype(field, field.dictionary.indexType, 
false)
+                            dictencodings[id] = 
DictEncoding{eltype(A),S,typeof(A)}(
+                                id,
+                                A,
+                                field.dictionary.isOrdered,
+                                values.metadata,
+                            )
+                        end
+                        continue
+                    end
+                    # new dictencoding or replace
                     field = dictencoded[id]
                     values, _, _ = build(
                         field,
                         field.type,
                         batch,
                         recordbatch,
-                        dictencodings,
+                        dictencodingslockable,
                         Int64(1),
                         Int64(1),
                         convert,
                     )
-                    dictencoding = dictencodings[id]
-                    if typeof(dictencoding.data) <: ChainedVector
-                        append!(dictencoding.data, values)
-                    else
-                        A = ChainedVector([dictencoding.data, values])
-                        S =
-                            field.dictionary.indexType === nothing ? Int32 :
-                            juliaeltype(field, field.dictionary.indexType, 
false)
-                        dictencodings[id] = 
DictEncoding{eltype(A),S,typeof(A)}(
-                            id,
-                            A,
-                            field.dictionary.isOrdered,
-                            values.metadata,
-                        )
-                    end
-                    continue
-                end
-                # new dictencoding or replace
-                field = dictencoded[id]
-                values, _, _ = build(
-                    field,
-                    field.type,
-                    batch,
-                    recordbatch,
-                    dictencodings,
-                    Int64(1),
-                    Int64(1),
-                    convert,
-                )
-                A = values
-                S =
-                    field.dictionary.indexType === nothing ? Int32 :
-                    juliaeltype(field, field.dictionary.indexType, false)
-                dictencodings[id] = DictEncoding{eltype(A),S,typeof(A)}(
-                    id,
-                    A,
-                    field.dictionary.isOrdered,
-                    values.metadata,
-                )
+                    A = values
+                    S =
+                        field.dictionary.indexType === nothing ? Int32 :
+                        juliaeltype(field, field.dictionary.indexType, false)
+                    dictencodings[id] = DictEncoding{eltype(A),S,typeof(A)}(
+                        id,
+                        A,
+                        field.dictionary.isOrdered,
+                        values.metadata,
+                    )
+                end # lock
                 @debug "parsed dictionary batch message: id=$id, 
data=$values\n"
             elseif header isa Meta.RecordBatch
                 anyrecordbatches = true
                 @debug "parsing record batch message: compression = 
$(header.compression)"
                 @wkspawn begin
-                    cols = collect(VectorIterator(sch, $batch, dictencodings, 
convert))
+                    cols = collect(VectorIterator(sch, $batch, 
dictencodingslockable, convert))
                     put!(() -> put!(tsks, cols), sync, $(rbi))
                 end
                 rbi += 1
@@ -610,7 +616,7 @@ end
 struct VectorIterator
     schema::Meta.Schema
     batch::Batch # batch.msg.header MUST BE RecordBatch
-    dictencodings::Dict{Int64,DictEncoding}
+    dictencodings::Lockable{Dict{Int64,DictEncoding}}
     convert::Bool
 end
 
@@ -654,14 +660,16 @@ function build(field::Meta.Field, batch, rb, de, nodeidx, 
bufferidx, convert)
         buffer = rb.buffers[bufferidx]
         S = d.indexType === nothing ? Int32 : juliaeltype(field, d.indexType, 
false)
         bytes, indices = reinterp(S, batch, buffer, rb.compression)
-        encoding = de[d.id]
-        A = DictEncoded(
-            bytes,
-            validity,
-            indices,
-            encoding,
-            buildmetadata(field.custom_metadata),
-        )
+        @lock de begin
+            encoding = de[][d.id]
+            A = DictEncoded(
+                bytes,
+                validity,
+                indices,
+                encoding,
+                buildmetadata(field.custom_metadata),
+            )
+        end
         nodeidx += 1
         bufferidx += 1
     else

Reply via email to