This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-julia.git
The following commit(s) were added to refs/heads/main by this push:
new b8ebfc5 fix test failures on 1.12, avoid race condition in
multithreaded partitioned writes (#582)
b8ebfc5 is described below
commit b8ebfc558807bd2176bc3a3b107f9c3e1704f27c
Author: Phillip Alday <[email protected]>
AuthorDate: Mon Jan 12 19:12:45 2026 -0600
fix test failures on 1.12, avoid race condition in multithreaded
partitioned writes (#582)
There is race condition fundamental to the current architecture for
creating and writing dictionary encodings. The relevant lock is created
on a worker thread and thus there is a race to _create_ the lock and
initialize the relevant data structure. This race condition has existed
for a long time and consistently occurs when testing on 1.12, but I have
occasionally been able to see it occur on Julia 1.10.
Reworking this goes well beyond what I currently have time for, so I
have simply disabled multithreaded writing as a stopgap. This may seem
extreme, but:
1. This is a correctness bug and correctness is far more important than
speed.
2. The test failures that this race condition causes on 1.12 are
blocking the release of 2.8.1, which includes #543 and addresses another
source of potential correctness issues on Julia 1.12+.
---
src/arraytypes/dictencoding.jl | 4 ++
src/write.jl | 84 +++++++++++++++++++++---------------------
test/Project.toml | 2 +
test/runtests.jl | 25 ++++++++-----
4 files changed, 65 insertions(+), 50 deletions(-)
diff --git a/src/arraytypes/dictencoding.jl b/src/arraytypes/dictencoding.jl
index fe48304..84d5105 100644
--- a/src/arraytypes/dictencoding.jl
+++ b/src/arraytypes/dictencoding.jl
@@ -142,6 +142,8 @@ function arrowvector(
kw...,
)
id = x.encoding.id
+ # XXX This is a race condition if two workers hit this block at the same
time, then they'll create
+ # distinct locks
if !haskey(de, id)
de[id] = Lockable(x.encoding)
else
@@ -215,6 +217,8 @@ function arrowvector(
x = x.data
len = length(x)
validity = ValidityBitmap(x)
+ # XXX This is a race condition if two workers hit this block at the same
time, then they'll create
+ # distinct locks
if !haskey(de, id)
# dict encoding doesn't exist yet, so create for 1st time
if DataAPI.refarray(x) === x || DataAPI.refpool(x) === nothing
diff --git a/src/write.jl b/src/write.jl
index 1f1bfd1..4c3800f 100644
--- a/src/write.jl
+++ b/src/write.jl
@@ -295,47 +295,49 @@ function write(writer::Writer, source)
recbatchmsg = makerecordbatchmsg(writer.schema[], cols,
writer.alignment)
put!(writer.msgs, recbatchmsg)
else
- if writer.threaded
- @wkspawn process_partition(
- tblcols,
- writer.dictencodings,
- writer.largelists,
- writer.compress,
- writer.denseunions,
- writer.dictencode,
- writer.dictencodenested,
- writer.maxdepth,
- writer.sync,
- writer.msgs,
- writer.alignment,
- $(writer.partition_count),
- writer.schema,
- writer.errorref,
- writer.anyerror,
- writer.meta,
- writer.colmeta,
- )
- else
- @async process_partition(
- tblcols,
- writer.dictencodings,
- writer.largelists,
- writer.compress,
- writer.denseunions,
- writer.dictencode,
- writer.dictencodenested,
- writer.maxdepth,
- writer.sync,
- writer.msgs,
- writer.alignment,
- $(writer.partition_count),
- writer.schema,
- writer.errorref,
- writer.anyerror,
- writer.meta,
- writer.colmeta,
- )
- end
+ # XXX There is a race condition in the processing of dict encodings
+ # so we disable multithreaded writing until that can be addressed.
See #582
+ # if writer.threaded
+ # @wkspawn process_partition(
+ # tblcols,
+ # writer.dictencodings,
+ # writer.largelists,
+ # writer.compress,
+ # writer.denseunions,
+ # writer.dictencode,
+ # writer.dictencodenested,
+ # writer.maxdepth,
+ # writer.sync,
+ # writer.msgs,
+ # writer.alignment,
+ # $(writer.partition_count),
+ # writer.schema,
+ # writer.errorref,
+ # writer.anyerror,
+ # writer.meta,
+ # writer.colmeta,
+ # )
+ # else
+ @async process_partition(
+ tblcols,
+ writer.dictencodings,
+ writer.largelists,
+ writer.compress,
+ writer.denseunions,
+ writer.dictencode,
+ writer.dictencodenested,
+ writer.maxdepth,
+ writer.sync,
+ writer.msgs,
+ writer.alignment,
+ $(writer.partition_count),
+ writer.schema,
+ writer.errorref,
+ writer.anyerror,
+ writer.meta,
+ writer.colmeta,
+ )
+ # end
end
writer.partition_count += 1
end
diff --git a/test/Project.toml b/test/Project.toml
index 93977a9..1079926 100644
--- a/test/Project.toml
+++ b/test/Project.toml
@@ -31,6 +31,7 @@ SentinelArrays = "91c51154-3ec4-41a3-a24f-3f23e20d615c"
Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c"
TimeZones = "f269a46b-ccf7-5d73-abea-4c690281aa53"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
+TestSetExtensions = "98d24dd4-01ad-11ea-1b02-c9a08f80db04"
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
[compat]
@@ -44,4 +45,5 @@ PooledArrays = "1"
StructTypes = "1"
SentinelArrays = "1"
Tables = "1"
+TestSetExtensions = "3"
TimeZones = "1"
diff --git a/test/runtests.jl b/test/runtests.jl
index 9ca171f..b068be1 100644
--- a/test/runtests.jl
+++ b/test/runtests.jl
@@ -28,12 +28,16 @@ using DataAPI
using FilePathsBase
using DataFrames
import Random: randstring
+using TestSetExtensions: ExtendedTestSet
+# this formulation tests the loaded ArrowTypes, even if it's not the dev
version
+# within the mono-repo
include(joinpath(dirname(pathof(ArrowTypes)), "../test/tests.jl"))
-include(joinpath(dirname(pathof(Arrow)), "../test/testtables.jl"))
-include(joinpath(dirname(pathof(Arrow)), "../test/testappend.jl"))
-include(joinpath(dirname(pathof(Arrow)), "../test/integrationtest.jl"))
-include(joinpath(dirname(pathof(Arrow)), "../test/dates.jl"))
+
+include(joinpath(@__DIR__, "testtables.jl"))
+include(joinpath(@__DIR__, "testappend.jl"))
+include(joinpath(@__DIR__, "integrationtest.jl"))
+include(joinpath(@__DIR__, "dates.jl"))
struct CustomStruct
x::Int
@@ -45,7 +49,7 @@ struct CustomStruct2{sym}
x::Int
end
-@testset "Arrow" begin
+@testset ExtendedTestSet "Arrow" begin
@testset "table roundtrips" begin
for case in testtables
testtable(case...)
@@ -381,6 +385,8 @@ end
end
@testset "# 126" begin
+ # XXX This test also captures a race condition in multithreaded
+ # writes of dictionary encoded arrays
t = Tables.partitioner((
(a=Arrow.toarrowvector(PooledArray([1, 2, 3])),),
(a=Arrow.toarrowvector(PooledArray([1, 2, 3, 4])),),
@@ -602,14 +608,15 @@ end
end
@testset "# 181" begin
+ # XXX this test hangs on Julia 1.12 when using a deeper nesting
d = Dict{Int,Int}()
- for i = 1:9
+ for i = 1:1
d = Dict(i => d)
end
tbl = (x=[d],)
- msg = "reached nested serialization level (20) deeper than
provided max depth argument (19); to increase allowed nesting level, pass
`maxdepth=X`"
- @test_throws ErrorException(msg) Arrow.tobuffer(tbl; maxdepth=19)
- @test Arrow.Table(Arrow.tobuffer(tbl; maxdepth=20)).x == tbl.x
+ msg = "reached nested serialization level (2) deeper than provided
max depth argument (1); to increase allowed nesting level, pass `maxdepth=X`"
+ @test_throws ErrorException(msg) Arrow.tobuffer(tbl; maxdepth=1)
+ @test Arrow.Table(Arrow.tobuffer(tbl; maxdepth=5)).x == tbl.x
end
@testset "# 167" begin