[
https://issues.apache.org/jira/browse/SPARK-57055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kent Yao updated SPARK-57055:
-----------------------------
Description:
h3. Retraction of original report (2026-05-26)
This ticket was originally filed (2026-05-26) as a Bug claiming
{{InjectRuntimeFilter}} produces silent false negatives on non-binary
collation StringType join keys. *That claim was wrong* and is hereby
retracted.
Empirical disproof: a fresh end-to-end test
({{InjectRuntimeFilterCollationSuite}}, 3 cases: UTF8_LCASE hit /
UNICODE_CI hit / UTF8_BINARY no-op) PASSES on master @ {{29fdcefb8f2}}
with no fix applied. Verbatim optimized plan dump:
{noformat}
Project [v#7]
+- Join Inner, (collationkey(k#6) = collationkey(k#8))
:- Filter might_contain(scalar-subquery#14 [],
xxhash64(collationkey(k#6), 42))
: : +- Aggregate [bloom_filter_agg(
: : xxhash64(collationkey(k#8), 42), ...) AS
bloomFilter#13]
: : +- Project [k#8]
: : +- Filter (isnotnull(tag#9) AND (tag#9 = 1))
: : +- Relation spark_catalog.default.bf_l[k#8,tag#9] parquet
{noformat}
Root cause: {{RewriteCollationJoin}} analyzer rule
({{sql/catalyst/.../analysis/RewriteCollationJoin.scala}}, introduced
by SPARK-48000 / {{e6236af3d08}}) already wraps non-binary collated
join keys with {{CollationKey(expr)}} *before* the optimizer runs.
{{InjectRuntimeFilter}} therefore sees an already-wrapped key and
emits {{xxhash64(collationkey(k), 42)}} on both build and probe sides
symmetrically. The BloomFilter contract holds by construction.
The original reproducer measured a hand-written SQL shape
({{xxhash64(name)}} composed by the user) that {{InjectRuntimeFilter}}
never produces. Failing test != failing production path; I conflated
the two. Apologies for the noise.
----
h3. Reshape: this ticket now tracks a different, narrower question
Reclassified to Improvement / Minor.
h4. Topic — DataFrameStatFunctions.bloomFilter is collation-blind
{{DataFrameStatFunctions.bloomFilter(col, expectedNumItems, numBits)}}
(public API since 2.0) is collation-blind. When invoked over a column
whose schema collation is non-binary (e.g. UTF8_LCASE), the resulting
{{org.apache.spark.util.sketch.BloomFilter}} treats values byte-wise:
{{mightContainString("alice")}} returns {{false}} after building over
{{"Alice"}} under UTF8_LCASE, even though the column's own {{=}}
semantics would consider the two equal.
Empirical evidence (test {{BloomFilterPath2ReproSuite}} on master @
{{29fdcefb8f2}}):
{noformat}
[PATH2-SCHEMA] name: string collate UTF8_LCASE
[PATH2-OUTCOME] BUILT-OK | mightContain('Alice')=true
| mightContain('alice')=false
| mightContain('Bob')=true
| mightContain('bob')=false
[PATH2-BASELINE] binary col: 'Alice'=true 'alice'=false
{noformat}
The UTF8_LCASE column behaves byte-for-byte identically to the
UTF8_BINARY baseline -- the API ignores collation.
h4. Strict contract analysis
The returned object is {{org.apache.spark.util.sketch.BloomFilter}}, a
plain Java sketch. Its {{mightContainString(String)}} method has never
promised collation-aware semantics; it operates on raw UTF-8 bytes.
Strictly speaking, no contract is violated.
However, from a Spark SQL user's perspective there is a real semantic
gap: {{WHERE name = 'alice'}} on a UTF8_LCASE column matches {{Alice}},
but the BloomFilter built from the same column via this API does not.
The mismatch is silent (no exception, no warning) and easy to miss.
h4. Possible directions (not committed)
Listed for discussion; ticket scope is "decide a direction", not
"prescribe one".
* {*}Reject at API boundary{*}: throw
{{IllegalArgumentException}} when called on a non-UTF8_BINARY
collation column. Smallest behavioural change; would break any
existing user (if any) silently relying on the byte-wise semantics.
* {*}Wrap with CollationKey internally{*}: at the API call site,
rewrite the column to {{CollationKey(col)}} before invoking
{{bloom_filter_agg}}. Build side becomes collation-aware. Probe
side ({{BloomFilter.mightContainString}}) would also need a
wrapper, ideally a {{CollationAwareBloomFilter}} subclass that
applies {{getCollationKey}} on each probe. Larger surface change;
serialization compatibility and PySpark parity need separate
thought.
* {*}Documentation only{*}: clarify in scaladoc that
{{DataFrameStatFunctions.bloomFilter}} is byte-wise regardless of
column collation, and recommend explicit
{{collation_key(col)}} composition. Smallest scope, leaves the
silent gap in place.
h4. Reproducer
Test file
{{sql/core/src/test/scala/org/apache/spark/sql/BloomFilterPath2ReproSuite.scala}}
(local at this reporter's worktree; can be contributed if there is
appetite for direction (a) or (b)).
----
h3. Out of scope for this ticket
* {{InjectRuntimeFilter}} / runtime bloom filter on join keys --
already correct via {{RewriteCollationJoin}} (see retraction
above).
* User-written {{bloom_filter_agg(xxhash64(string_col))}} +
{{might_contain(bf, xxhash64('literal'))}} -- a user is hashing
manually; the result depends on what the user wrote.
* {{BloomFilterAggregate.BinaryUpdater}} -- reachable only via the
same Java-API path above; covered if that path is fixed.
----
Reporter: Kent Yao
Affects Versions: 4.0.0, 4.1.0, 5.0.0
Component: SQL
was:
h3. Summary
{{bloom_filter_agg}} + {{might_contain}} and the runtime bloom filter injected
by {{InjectRuntimeFilter}} both hash raw UTF8 bytes via
{{XxHash64(stringExpr)}},
ignoring {{StringType.collationId}}. For any non-binary collation (UTF8_LCASE,
UNICODE_CI, all ICU-backed accent/case-insensitive collations), strings that
are semantically equal under the collation but byte-different (e.g. {{'Alice'}}
vs {{'alice'}} under UTF8_LCASE) hash to different long values and miss the
bloom. This violates BloomFilter's "no false negative" contract and silently
drops matching rows in BF-filtered joins.
h3. Origin
SPARK-46832 (commit {{861cca3da4c}}, Feb 2024) flipped the type pattern in
{{BloomFilterAggregate.scala}} to accept any-collation StringType but kept the
collation-blind {{BinaryUpdater}}:
{code:scala}
- case (LongType | IntegerType | ShortType | ByteType | StringType, LongType,
LongType) =>
+ case (LongType | IntegerType | ShortType | ByteType | _: StringType,
LongType, LongType) =>
- case StringType => BinaryUpdater
+ case _: StringType => BinaryUpdater
{code}
{{InjectRuntimeFilter.scala:64,73}} builds and probes with
{{new XxHash64(Seq(filterCreationSideKey/filterApplicationSideKey))}} — no
collation-aware hashing.
h3. Reproducer
Drop into {{sql/core/src/test/scala/org/apache/spark/sql/}}:
{code:scala}
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions.{BloomFilterMightContain,
Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.test.SharedSparkSession
class BloomFilterCollationReproSuite extends QueryTest with SharedSparkSession {
private def sessionFuncId(n: String) =
FunctionIdentifier(n, Some(CatalogManager.SESSION_NAMESPACE),
Some(CatalogManager.SYSTEM_CATALOG_NAME))
override def beforeAll(): Unit = {
super.beforeAll()
spark.sessionState.functionRegistry.registerFunction(
sessionFuncId("bloom_filter_agg"),
new ExpressionInfo(classOf[BloomFilterAggregate].getName,
"bloom_filter_agg"),
(cs: Seq[Expression]) => cs.size match {
case 1 => new BloomFilterAggregate(cs.head)
case 2 => new BloomFilterAggregate(cs.head, cs(1))
case 3 => new BloomFilterAggregate(cs.head, cs(1), cs(2))
})
spark.sessionState.functionRegistry.registerFunction(
sessionFuncId("might_contain"),
new ExpressionInfo(classOf[BloomFilterMightContain].getName,
"might_contain"),
(cs: Seq[Expression]) => BloomFilterMightContain(cs.head, cs(1)))
}
test("LCASE silent FN: xxhash64(Alice) build vs xxhash64(alice) probe") {
val r = spark.sql(
"""SELECT might_contain(
| (SELECT bloom_filter_agg(xxhash64(name))
| FROM VALUES ('Alice' COLLATE UTF8_LCASE) AS t(name)),
| xxhash64(cast('alice' COLLATE UTF8_LCASE as string)))""".stripMargin)
.collect()(0).getBoolean(0)
println(s"[REPRO] LCASE 'alice' hit = $r (expect true under LCASE; observed
false = BUG)")
}
test("LCASE baseline: same string Alice/Alice must hit") {
val r = spark.sql(
"""SELECT might_contain(
| (SELECT bloom_filter_agg(xxhash64(name))
| FROM VALUES ('Alice' COLLATE UTF8_LCASE) AS t(name)),
| xxhash64(cast('Alice' COLLATE UTF8_LCASE as string)))""".stripMargin)
.collect()(0).getBoolean(0)
println(s"[REPRO-baseline] same-string Alice/Alice hit = $r (expect true)")
}
}
{code}
Run: {{build/sbt 'sql/testOnly
org.apache.spark.sql.BloomFilterCollationReproSuite'}}
Observed output (on master {{29fdcefb8f2}}, 5.0.0-SNAPSHOT, 2026-05-25):
{code}
[REPRO] LCASE 'alice' hit = false (expect true under LCASE; observed
false = BUG)
[REPRO-baseline] same-string Alice/Alice hit = true (expect true)
{code}
The repro path mirrors what {{InjectRuntimeFilter}} injects, so the same silent
FN reaches any {{JOIN ON t1.s = t2.s}} where {{s}} is non-binary collation and
{{spark.sql.optimizer.runtime.bloomFilter.enabled = true}}.
h3. Impact
- User-facing: {{bloom_filter_agg(s COLLATE UTF8_LCASE)}} +
{{might_contain(...)}}
returns false for collation-equal probes → silent wrong result.
- Optimizer: {{spark.sql.optimizer.runtime.bloomFilter.enabled}} (on by default
in many distributions) plus non-binary collation join keys → join silently
drops collation-equal rows. The user sees a smaller result set than the
semantics promise, with no error or warning.
h3. Suggested remediation
Three options. We recommend (1) as it preserves the feature for the dominant
in-query use case and adds an explicit cross-version caveat for the rare
persisted case.
1. *Collation-aware hashing.* Replace {{XxHash64(stringExpr)}} build/probe
with {{XxHash64}} over {{CollationFactory.getCollationKey(collationId, str)}}
bytes. Query-internal BF (the dominant case — runtime BF, single-query
{{bloom_filter_agg}}) is fully correct because build and probe share one
JVM / one icu4j version. Cost is a single {{Collator.getCollationKey}} per
probe (cached {{Collator}} instance per {{collationId}}).
Caveat: BF results that are persisted (table write, RDD checkpoint, Connect
cross-version client/server) become invalid across icu4j upgrades. icu4j
75 → 76 altered 23/33 cells in a cross-version probe (`en_US 'a': 0x2a →
0x2b`), 77.1 → 78.3 altered 4/33 (see PR #56096
{{ICUCollationSortKeyGoldenSuite}} for the cross-version visibility golden).
Recommend documenting {{bloom_filter_agg}} output as non-cross-version-
stable, similar to the existing {{Collation*Benchmark*-results.txt}}
convention.
2. *Reject non-binary StringType* in
{{BloomFilterAggregate.checkInputDataTypes}}
and skip injection in {{InjectRuntimeFilter}} for non-binary string join
keys.
Typed error ({{BLOOM_FILTER_NON_BINARY_COLLATION_NOT_SUPPORTED}} or similar).
Safe and minimal; removes a valid in-query use case. Fallback if (1) is
rejected on cost grounds.
3. *Document-only* (recommend {{STRING COLLATE UTF8_BINARY}} for BF keys).
Inadequate; the bug has been silent for ~2 years and added documentation
will not surface it to users hit via default-on {{runtime.bloomFilter}}.
h3. Why this is a correctness bug (not a perf / known-limit)
The BloomFilter contract is "no false negatives". The current behavior
provides silent false negatives under a feature (non-binary collation) that
the type system explicitly accepts. There is no error, warning, or
documented caveat. Users querying a SQL surface that says "collated string
equality is supported" receive results inconsistent with that contract.
h3. References
- Commit {{861cca3da4c}} — SPARK-46832 {{[SQL] Introducing Collate and
Collation expressions}}
-
{{sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala:81,159}}
-
{{sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:64,73}}
-
{{sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CollationFactory.java}}
— {{getCollationKey(id, str)}} is the canonical sort-key API
- PR apache/spark#56096 — {{ICUCollationSortKeyGoldenSuite}} provides the
cross-version drift evidence cited in the caveat above
Issue Type: Improvement (was: Bug)
Priority: Minor (was: Major)
Summary: DataFrameStatFunctions.bloomFilter is collation-blind on
non-binary collation columns (was: `bloom_filter_agg` and
`InjectRuntimeFilter` produce silent false negatives on non-binary collation
StringType)
> DataFrameStatFunctions.bloomFilter is collation-blind on non-binary collation
> columns
> -------------------------------------------------------------------------------------
>
> Key: SPARK-57055
> URL: https://issues.apache.org/jira/browse/SPARK-57055
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 4.1.0, 4.0.0, 5.0.0
> Reporter: Kent Yao
> Priority: Minor
>
> h3. Retraction of original report (2026-05-26)
> This ticket was originally filed (2026-05-26) as a Bug claiming
> {{InjectRuntimeFilter}} produces silent false negatives on non-binary
> collation StringType join keys. *That claim was wrong* and is hereby
> retracted.
> Empirical disproof: a fresh end-to-end test
> ({{InjectRuntimeFilterCollationSuite}}, 3 cases: UTF8_LCASE hit /
> UNICODE_CI hit / UTF8_BINARY no-op) PASSES on master @ {{29fdcefb8f2}}
> with no fix applied. Verbatim optimized plan dump:
> {noformat}
> Project [v#7]
> +- Join Inner, (collationkey(k#6) = collationkey(k#8))
> :- Filter might_contain(scalar-subquery#14 [],
> xxhash64(collationkey(k#6), 42))
> : : +- Aggregate [bloom_filter_agg(
> : : xxhash64(collationkey(k#8), 42), ...) AS
> bloomFilter#13]
> : : +- Project [k#8]
> : : +- Filter (isnotnull(tag#9) AND (tag#9 = 1))
> : : +- Relation spark_catalog.default.bf_l[k#8,tag#9] parquet
> {noformat}
> Root cause: {{RewriteCollationJoin}} analyzer rule
> ({{sql/catalyst/.../analysis/RewriteCollationJoin.scala}}, introduced
> by SPARK-48000 / {{e6236af3d08}}) already wraps non-binary collated
> join keys with {{CollationKey(expr)}} *before* the optimizer runs.
> {{InjectRuntimeFilter}} therefore sees an already-wrapped key and
> emits {{xxhash64(collationkey(k), 42)}} on both build and probe sides
> symmetrically. The BloomFilter contract holds by construction.
> The original reproducer measured a hand-written SQL shape
> ({{xxhash64(name)}} composed by the user) that {{InjectRuntimeFilter}}
> never produces. Failing test != failing production path; I conflated
> the two. Apologies for the noise.
> ----
> h3. Reshape: this ticket now tracks a different, narrower question
> Reclassified to Improvement / Minor.
> h4. Topic — DataFrameStatFunctions.bloomFilter is collation-blind
> {{DataFrameStatFunctions.bloomFilter(col, expectedNumItems, numBits)}}
> (public API since 2.0) is collation-blind. When invoked over a column
> whose schema collation is non-binary (e.g. UTF8_LCASE), the resulting
> {{org.apache.spark.util.sketch.BloomFilter}} treats values byte-wise:
> {{mightContainString("alice")}} returns {{false}} after building over
> {{"Alice"}} under UTF8_LCASE, even though the column's own {{=}}
> semantics would consider the two equal.
> Empirical evidence (test {{BloomFilterPath2ReproSuite}} on master @
> {{29fdcefb8f2}}):
> {noformat}
> [PATH2-SCHEMA] name: string collate UTF8_LCASE
> [PATH2-OUTCOME] BUILT-OK | mightContain('Alice')=true
> | mightContain('alice')=false
> | mightContain('Bob')=true
> | mightContain('bob')=false
> [PATH2-BASELINE] binary col: 'Alice'=true 'alice'=false
> {noformat}
> The UTF8_LCASE column behaves byte-for-byte identically to the
> UTF8_BINARY baseline -- the API ignores collation.
> h4. Strict contract analysis
> The returned object is {{org.apache.spark.util.sketch.BloomFilter}}, a
> plain Java sketch. Its {{mightContainString(String)}} method has never
> promised collation-aware semantics; it operates on raw UTF-8 bytes.
> Strictly speaking, no contract is violated.
> However, from a Spark SQL user's perspective there is a real semantic
> gap: {{WHERE name = 'alice'}} on a UTF8_LCASE column matches {{Alice}},
> but the BloomFilter built from the same column via this API does not.
> The mismatch is silent (no exception, no warning) and easy to miss.
> h4. Possible directions (not committed)
> Listed for discussion; ticket scope is "decide a direction", not
> "prescribe one".
> * {*}Reject at API boundary{*}: throw
> {{IllegalArgumentException}} when called on a non-UTF8_BINARY
> collation column. Smallest behavioural change; would break any
> existing user (if any) silently relying on the byte-wise semantics.
> * {*}Wrap with CollationKey internally{*}: at the API call site,
> rewrite the column to {{CollationKey(col)}} before invoking
> {{bloom_filter_agg}}. Build side becomes collation-aware. Probe
> side ({{BloomFilter.mightContainString}}) would also need a
> wrapper, ideally a {{CollationAwareBloomFilter}} subclass that
> applies {{getCollationKey}} on each probe. Larger surface change;
> serialization compatibility and PySpark parity need separate
> thought.
> * {*}Documentation only{*}: clarify in scaladoc that
> {{DataFrameStatFunctions.bloomFilter}} is byte-wise regardless of
> column collation, and recommend explicit
> {{collation_key(col)}} composition. Smallest scope, leaves the
> silent gap in place.
> h4. Reproducer
> Test file
> {{sql/core/src/test/scala/org/apache/spark/sql/BloomFilterPath2ReproSuite.scala}}
> (local at this reporter's worktree; can be contributed if there is
> appetite for direction (a) or (b)).
> ----
> h3. Out of scope for this ticket
> * {{InjectRuntimeFilter}} / runtime bloom filter on join keys --
> already correct via {{RewriteCollationJoin}} (see retraction
> above).
> * User-written {{bloom_filter_agg(xxhash64(string_col))}} +
> {{might_contain(bf, xxhash64('literal'))}} -- a user is hashing
> manually; the result depends on what the user wrote.
> * {{BloomFilterAggregate.BinaryUpdater}} -- reachable only via the
> same Java-API path above; covered if that path is fixed.
> ----
> Reporter: Kent Yao
> Affects Versions: 4.0.0, 4.1.0, 5.0.0
> Component: SQL
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]