This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new 098cbbddbcd7 [SPARK-57185][SQL] Use thread-local ICU collators to fix
lock contention in CollationFactory
098cbbddbcd7 is described below
commit 098cbbddbcd7ae18688810194d975a43843f7bf1
Author: Dejan Krakovic <[email protected]>
AuthorDate: Mon Jun 1 21:11:50 2026 +0800
[SPARK-57185][SQL] Use thread-local ICU collators to fix lock contention in
CollationFactory
### What changes were proposed in this pull request?
Use thread-local `Collator` instances in
`CollationSpecICU.buildCollation()` to eliminate lock contention on ICU's
`RuleBasedCollator`. A frozen `RuleBasedCollator` serializes all threads
through a `ReentrantLock` on its internal collation buffer (used by
`getCollationKey`/`compare`), which causes a significant parallelism loss when
many threads compare/hash collated strings concurrently.
By creating independent per-thread instances via `Collator.getInstance()`,
each thread operates on its own buffer without locking. Each instance is still
frozen as a mutation guard. The `Collation.getCollator()` accessor now returns
the current thread's instance (or `null` for non-ICU collations).
### Why are the changes needed?
To remove a concurrency bottleneck when comparing or hashing collated
columns under parallel access.
### Does this PR introduce _any_ user-facing change?
No. This is purely a concurrency optimization; collation results are
identical.
### How was this patch tested?
Added a concurrent test in `CollationFactorySuite` that verifies
`comparator`, `hashFunction`, and `getCollator()` produce consistent results
under parallel access across `UNICODE`, `en`, `de`, `en_CI`, and `en_AI`
collations. Existing `CollationFactorySuite` tests continue to pass.
### Was this patch authored or co-authored using generative AI tooling?
Yes, co-authored using Claude code.
Closes #56236 from dejankrak-db/thread-local-collator-fix-oss.
Authored-by: Dejan Krakovic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 89092b956884ce0e2161f9fbcf1996a29e9d8c38)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/util/CollationFactory.java | 35 ++++++++++++++--------
.../spark/unsafe/types/CollationFactorySuite.scala | 31 +++++++++++++++++++
2 files changed, 54 insertions(+), 12 deletions(-)
diff --git
a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
index 3e2bfbcd87ca..8df59b1f6e34 100644
---
a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
+++
b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
@@ -110,7 +110,7 @@ public final class CollationFactory {
public static class Collation {
public final String collationName;
public final String provider;
- private final Collator collator;
+ private final ThreadLocal<Collator> threadLocalCollator;
public final Comparator<UTF8String> comparator;
/**
@@ -187,7 +187,7 @@ public final class CollationFactory {
public Collation(
String collationName,
String provider,
- Collator collator,
+ ThreadLocal<Collator> threadLocalCollator,
Comparator<UTF8String> comparator,
String version,
Function<UTF8String, byte[]> sortKeyFunction,
@@ -197,7 +197,7 @@ public final class CollationFactory {
boolean supportsSpaceTrimming) {
this.collationName = collationName;
this.provider = provider;
- this.collator = collator;
+ this.threadLocalCollator = threadLocalCollator;
this.comparator = comparator;
this.version = version;
this.sortKeyFunction = sortKeyFunction;
@@ -216,7 +216,7 @@ public final class CollationFactory {
}
public Collator getCollator() {
- return collator;
+ return threadLocalCollator != null ? threadLocalCollator.get() : null;
}
/**
@@ -1016,29 +1016,40 @@ public final class CollationFactory {
builder.setUnicodeLocaleKeyword("ks", "level1");
}
ULocale resultLocale = builder.build();
- Collator collator = Collator.getInstance(resultLocale);
- // Freeze ICU collator to ensure thread safety.
- collator.freeze();
+
+ // Use thread-local Collator instances to avoid lock contention.
+ // A frozen RuleBasedCollator serializes all threads through a
ReentrantLock on its
+ // internal collation buffer (used by getCollationKey/compare). By
creating independent
+ // per-thread instances via Collator.getInstance(), each thread
operates on its own
+ // buffer without locking. Each instance is frozen as a mutation guard
so that any
+ // accidental call to setStrength() or similar throws immediately.
+ ThreadLocal<Collator> threadLocalCollator = ThreadLocal.withInitial(
+ () -> {
+ Collator collator = Collator.getInstance(resultLocale);
+ collator.freeze();
+ return collator;
+ });
Comparator<UTF8String> comparator;
Function<UTF8String, byte[]> sortKeyFunction;
if (spaceTrimming == SpaceTrimming.NONE) {
comparator = (s1, s2) ->
- collator.compare(s1.toValidString(), s2.toValidString());
- sortKeyFunction = s ->
collator.getCollationKey(s.toValidString()).toByteArray();
+ threadLocalCollator.get().compare(s1.toValidString(),
s2.toValidString());
+ sortKeyFunction = s ->
+
threadLocalCollator.get().getCollationKey(s.toValidString()).toByteArray();
} else {
- comparator = (s1, s2) -> collator.compare(
+ comparator = (s1, s2) -> threadLocalCollator.get().compare(
applyTrimmingPolicy(s1, spaceTrimming).toValidString(),
applyTrimmingPolicy(s2, spaceTrimming).toValidString());
- sortKeyFunction = s -> collator.getCollationKey(
+ sortKeyFunction = s -> threadLocalCollator.get().getCollationKey(
applyTrimmingPolicy(s,
spaceTrimming).toValidString()).toByteArray();
}
return new Collation(
normalizedCollationName(),
PROVIDER_ICU,
- collator,
+ threadLocalCollator,
comparator,
ICU_VERSION,
sortKeyFunction,
diff --git
a/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/CollationFactorySuite.scala
b/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/CollationFactorySuite.scala
index ddf588b6c64c..87f1d0a1c75f 100644
---
a/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/CollationFactorySuite.scala
+++
b/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/CollationFactorySuite.scala
@@ -300,6 +300,37 @@ class CollationFactorySuite extends AnyFunSuite with
Matchers { // scalastyle:ig
})
}
+ test("test concurrent comparator, sortKeyFunction, and getCollator on ICU
collations") {
+ // Thread-local collator instances avoid lock contention on ICU's internal
collation buffer.
+ // This test verifies correctness under concurrent access for all three
paths:
+ // comparator, sortKeyFunction, and getCollator().
+ val collationNames = Seq("UNICODE", "en", "de", "en_CI", "en_AI")
+ collationNames.foreach { name =>
+ val collation = fetchCollation(name)
+ val s1 = toUTF8("apple")
+ val s2 = toUTF8("banana")
+ val expectedCmp = collation.comparator.compare(s1, s2)
+ val expectedKey =
collation.sortKeyFunction.apply(s1).asInstanceOf[Array[Byte]]
+ val expectedCollatorKey =
+ collation.getCollator.getCollationKey(s1.toValidString()).toByteArray
+
+ (0 to 5).foreach(_ => {
+ IntStream.rangeClosed(0, 200).parallel().forEach { _ =>
+ val cmp = collation.comparator.compare(s1, s2)
+ assert(cmp == expectedCmp,
+ s"Comparator returned inconsistent result for $name")
+ val key =
collation.sortKeyFunction.apply(s1).asInstanceOf[Array[Byte]]
+ assert(java.util.Arrays.equals(key, expectedKey),
+ s"sortKeyFunction returned inconsistent result for $name")
+ val collatorKey =
+
collation.getCollator.getCollationKey(s1.toValidString()).toByteArray
+ assert(java.util.Arrays.equals(collatorKey, expectedCollatorKey),
+ s"getCollator().getCollationKey() returned inconsistent result for
$name")
+ }
+ })
+ }
+ }
+
test("test collation caching") {
Seq(
"UTF8_BINARY",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]