This is an automated email from the ASF dual-hosted git repository.
quinnj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git
The following commit(s) were added to refs/heads/main by this push:
new fc8b899 Make DictEncoding reading threadsafe (#535)
fc8b899 is described below
commit fc8b899356a4298a25a08a9f314be0a49bb2bbc5
Author: Jacob Quinn <[email protected]>
AuthorDate: Tue Nov 26 21:22:15 2024 -0700
Make DictEncoding reading threadsafe (#535)
Fixes #534
---
.github/workflows/ci.yml | 3 +-
Project.toml | 2 +-
src/table.jl | 176 +++++++++++++++++++++++++----------------------
3 files changed, 95 insertions(+), 86 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 5356fea..911910e 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -76,7 +76,8 @@ jobs:
- name: ArrowTypes.jl
dir: './src/ArrowTypes'
version:
- - '1.6'
+ - 'min'
+ - 'lts'
- '1' # automatically expands to the latest stable 1.x release of
Julia
- 'nightly'
os:
diff --git a/Project.toml b/Project.toml
index b7fb92a..ffc5729 100644
--- a/Project.toml
+++ b/Project.toml
@@ -49,4 +49,4 @@ SentinelArrays = "1"
Tables = "1.1"
TimeZones = "1"
TranscodingStreams = "0.9.12, 0.10, 0.11"
-julia = "1.6"
+julia = "1.9"
diff --git a/src/table.jl b/src/table.jl
index 6dfb9b4..6990341 100644
--- a/src/table.jl
+++ b/src/table.jl
@@ -70,7 +70,7 @@ mutable struct Stream
names::Vector{Symbol}
types::Vector{Type}
schema::Union{Nothing,Meta.Schema}
- dictencodings::Dict{Int64,DictEncoding} # dictionary id => DictEncoding
+ dictencodings::Lockable{Dict{Int64,DictEncoding}} # dictionary id =>
DictEncoding
dictencoded::Dict{Int64,Meta.Field} # dictionary id => field
convert::Bool
compression::Ref{Union{Symbol,Nothing}}
@@ -82,7 +82,7 @@ function Stream(inputs::Vector{ArrowBlob}; convert::Bool=true)
names = Symbol[]
types = Type[]
schema = nothing
- dictencodings = Dict{Int64,DictEncoding}()
+ dictencodings = Lockable(Dict{Int64,DictEncoding}())
dictencoded = Dict{Int64,Meta.Field}()
compression = Ref{Union{Symbol,Nothing}}(nothing)
Stream(
@@ -210,8 +210,26 @@ function Base.iterate(x::Stream, (pos, id)=(1, 0))
if recordbatch.compression !== nothing
compression = recordbatch.compression
end
- if haskey(x.dictencodings, id) && header.isDelta
- # delta
+ @lock x.dictencodings begin
+ dictencodings = x.dictencodings[]
+ if haskey(dictencodings, id) && header.isDelta
+ # delta
+ field = x.dictencoded[id]
+ values, _, _ = build(
+ field,
+ field.type,
+ batch,
+ recordbatch,
+ x.dictencodings,
+ Int64(1),
+ Int64(1),
+ x.convert,
+ )
+ dictencoding = dictencodings[id]
+ append!(dictencoding.data, values)
+ continue
+ end
+ # new dictencoding or replace
field = x.dictencoded[id]
values, _, _ = build(
field,
@@ -223,32 +241,17 @@ function Base.iterate(x::Stream, (pos, id)=(1, 0))
Int64(1),
x.convert,
)
- dictencoding = x.dictencodings[id]
- append!(dictencoding.data, values)
- continue
- end
- # new dictencoding or replace
- field = x.dictencoded[id]
- values, _, _ = build(
- field,
- field.type,
- batch,
- recordbatch,
- x.dictencodings,
- Int64(1),
- Int64(1),
- x.convert,
- )
- A = ChainedVector([values])
- S =
- field.dictionary.indexType === nothing ? Int32 :
- juliaeltype(field, field.dictionary.indexType, false)
- x.dictencodings[id] = DictEncoding{eltype(A),S,typeof(A)}(
- id,
- A,
- field.dictionary.isOrdered,
- values.metadata,
- )
+ A = ChainedVector([values])
+ S =
+ field.dictionary.indexType === nothing ? Int32 :
+ juliaeltype(field, field.dictionary.indexType, false)
+ dictencodings[id] = DictEncoding{eltype(A),S,typeof(A)}(
+ id,
+ A,
+ field.dictionary.isOrdered,
+ values.metadata,
+ )
+ end # lock
@debug "parsed dictionary batch message: id=$id, data=$values\n"
elseif header isa Meta.RecordBatch
@debug "parsing record batch message: compression =
$(header.compression)"
@@ -415,7 +418,7 @@ Table(inputs::Vector; kw...) =
function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
t = Table()
sch = nothing
- dictencodings = Dict{Int64,DictEncoding}() # dictionary id => DictEncoding
+ dictencodingslockable = Lockable(Dict{Int64,DictEncoding}()) # dictionary
id => DictEncoding
dictencoded = Dict{Int64,Meta.Field}() # dictionary id => field
sync = OrderedSynchronizer()
tsks = Channel{Any}(Inf)
@@ -465,65 +468,68 @@ function Table(blobs::Vector{ArrowBlob};
convert::Bool=true)
elseif header isa Meta.DictionaryBatch
id = header.id
recordbatch = header.data
- @debug "parsing dictionary batch message: id = $id,
compression = $(recordbatch.compression)"
- if haskey(dictencodings, id) && header.isDelta
- # delta
+ @info "parsing dictionary batch message: id = $id, compression
= $(recordbatch.compression)"
+ @lock dictencodingslockable begin
+ dictencodings = dictencodingslockable[]
+ if haskey(dictencodings, id) && header.isDelta
+ # delta
+ field = dictencoded[id]
+ values, _, _ = build(
+ field,
+ field.type,
+ batch,
+ recordbatch,
+ dictencodingslockable,
+ Int64(1),
+ Int64(1),
+ convert,
+ )
+ dictencoding = dictencodings[id]
+ if typeof(dictencoding.data) <: ChainedVector
+ append!(dictencoding.data, values)
+ else
+ A = ChainedVector([dictencoding.data, values])
+ S =
+ field.dictionary.indexType === nothing ? Int32
:
+ juliaeltype(field, field.dictionary.indexType,
false)
+ dictencodings[id] =
DictEncoding{eltype(A),S,typeof(A)}(
+ id,
+ A,
+ field.dictionary.isOrdered,
+ values.metadata,
+ )
+ end
+ continue
+ end
+ # new dictencoding or replace
field = dictencoded[id]
values, _, _ = build(
field,
field.type,
batch,
recordbatch,
- dictencodings,
+ dictencodingslockable,
Int64(1),
Int64(1),
convert,
)
- dictencoding = dictencodings[id]
- if typeof(dictencoding.data) <: ChainedVector
- append!(dictencoding.data, values)
- else
- A = ChainedVector([dictencoding.data, values])
- S =
- field.dictionary.indexType === nothing ? Int32 :
- juliaeltype(field, field.dictionary.indexType,
false)
- dictencodings[id] =
DictEncoding{eltype(A),S,typeof(A)}(
- id,
- A,
- field.dictionary.isOrdered,
- values.metadata,
- )
- end
- continue
- end
- # new dictencoding or replace
- field = dictencoded[id]
- values, _, _ = build(
- field,
- field.type,
- batch,
- recordbatch,
- dictencodings,
- Int64(1),
- Int64(1),
- convert,
- )
- A = values
- S =
- field.dictionary.indexType === nothing ? Int32 :
- juliaeltype(field, field.dictionary.indexType, false)
- dictencodings[id] = DictEncoding{eltype(A),S,typeof(A)}(
- id,
- A,
- field.dictionary.isOrdered,
- values.metadata,
- )
+ A = values
+ S =
+ field.dictionary.indexType === nothing ? Int32 :
+ juliaeltype(field, field.dictionary.indexType, false)
+ dictencodings[id] = DictEncoding{eltype(A),S,typeof(A)}(
+ id,
+ A,
+ field.dictionary.isOrdered,
+ values.metadata,
+ )
+ end # lock
@debug "parsed dictionary batch message: id=$id,
data=$values\n"
elseif header isa Meta.RecordBatch
anyrecordbatches = true
@debug "parsing record batch message: compression =
$(header.compression)"
@wkspawn begin
- cols = collect(VectorIterator(sch, $batch, dictencodings,
convert))
+ cols = collect(VectorIterator(sch, $batch,
dictencodingslockable, convert))
put!(() -> put!(tsks, cols), sync, $(rbi))
end
rbi += 1
@@ -610,7 +616,7 @@ end
struct VectorIterator
schema::Meta.Schema
batch::Batch # batch.msg.header MUST BE RecordBatch
- dictencodings::Dict{Int64,DictEncoding}
+ dictencodings::Lockable{Dict{Int64,DictEncoding}}
convert::Bool
end
@@ -654,14 +660,16 @@ function build(field::Meta.Field, batch, rb, de, nodeidx,
bufferidx, convert)
buffer = rb.buffers[bufferidx]
S = d.indexType === nothing ? Int32 : juliaeltype(field, d.indexType,
false)
bytes, indices = reinterp(S, batch, buffer, rb.compression)
- encoding = de[d.id]
- A = DictEncoded(
- bytes,
- validity,
- indices,
- encoding,
- buildmetadata(field.custom_metadata),
- )
+ @lock de begin
+ encoding = de[][d.id]
+ A = DictEncoded(
+ bytes,
+ validity,
+ indices,
+ encoding,
+ buildmetadata(field.custom_metadata),
+ )
+ end
nodeidx += 1
bufferidx += 1
else