Kent Yao created SPARK-57055:
--------------------------------
Summary: `bloom_filter_agg` and `InjectRuntimeFilter` produce
silent false negatives on non-binary collation StringType
Key: SPARK-57055
URL: https://issues.apache.org/jira/browse/SPARK-57055
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 4.1.0, 5.0.0, 4.0.0
Reporter: Kent Yao
h3. Summary
{{bloom_filter_agg}} + {{might_contain}} and the runtime bloom filter injected
by {{InjectRuntimeFilter}} both hash raw UTF8 bytes via
{{XxHash64(stringExpr)}},
ignoring {{StringType.collationId}}. For any non-binary collation (UTF8_LCASE,
UNICODE_CI, all ICU-backed accent/case-insensitive collations), strings that
are semantically equal under the collation but byte-different (e.g. {{'Alice'}}
vs {{'alice'}} under UTF8_LCASE) hash to different long values and miss the
bloom. This violates BloomFilter's "no false negative" contract and silently
drops matching rows in BF-filtered joins.
h3. Origin
SPARK-46832 (commit {{861cca3da4c}}, Feb 2024) flipped the type pattern in
{{BloomFilterAggregate.scala}} to accept any-collation StringType but kept the
collation-blind {{BinaryUpdater}}:
{code:scala}
- case (LongType | IntegerType | ShortType | ByteType | StringType, LongType,
LongType) =>
+ case (LongType | IntegerType | ShortType | ByteType | _: StringType,
LongType, LongType) =>
- case StringType => BinaryUpdater
+ case _: StringType => BinaryUpdater
{code}
{{InjectRuntimeFilter.scala:64,73}} builds and probes with
{{new XxHash64(Seq(filterCreationSideKey/filterApplicationSideKey))}} — no
collation-aware hashing.
h3. Reproducer
Drop into {{sql/core/src/test/scala/org/apache/spark/sql/}}:
{code:scala}
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions.{BloomFilterMightContain,
Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.test.SharedSparkSession
class BloomFilterCollationReproSuite extends QueryTest with SharedSparkSession {
private def sessionFuncId(n: String) =
FunctionIdentifier(n, Some(CatalogManager.SESSION_NAMESPACE),
Some(CatalogManager.SYSTEM_CATALOG_NAME))
override def beforeAll(): Unit = {
super.beforeAll()
spark.sessionState.functionRegistry.registerFunction(
sessionFuncId("bloom_filter_agg"),
new ExpressionInfo(classOf[BloomFilterAggregate].getName,
"bloom_filter_agg"),
(cs: Seq[Expression]) => cs.size match {
case 1 => new BloomFilterAggregate(cs.head)
case 2 => new BloomFilterAggregate(cs.head, cs(1))
case 3 => new BloomFilterAggregate(cs.head, cs(1), cs(2))
})
spark.sessionState.functionRegistry.registerFunction(
sessionFuncId("might_contain"),
new ExpressionInfo(classOf[BloomFilterMightContain].getName,
"might_contain"),
(cs: Seq[Expression]) => BloomFilterMightContain(cs.head, cs(1)))
}
test("LCASE silent FN: xxhash64(Alice) build vs xxhash64(alice) probe") {
val r = spark.sql(
"""SELECT might_contain(
| (SELECT bloom_filter_agg(xxhash64(name))
| FROM VALUES ('Alice' COLLATE UTF8_LCASE) AS t(name)),
| xxhash64(cast('alice' COLLATE UTF8_LCASE as string)))""".stripMargin)
.collect()(0).getBoolean(0)
println(s"[REPRO] LCASE 'alice' hit = $r (expect true under LCASE; observed
false = BUG)")
}
test("LCASE baseline: same string Alice/Alice must hit") {
val r = spark.sql(
"""SELECT might_contain(
| (SELECT bloom_filter_agg(xxhash64(name))
| FROM VALUES ('Alice' COLLATE UTF8_LCASE) AS t(name)),
| xxhash64(cast('Alice' COLLATE UTF8_LCASE as string)))""".stripMargin)
.collect()(0).getBoolean(0)
println(s"[REPRO-baseline] same-string Alice/Alice hit = $r (expect true)")
}
}
{code}
Run: {{build/sbt 'sql/testOnly
org.apache.spark.sql.BloomFilterCollationReproSuite'}}
Observed output (on master {{29fdcefb8f2}}, 5.0.0-SNAPSHOT, 2026-05-25):
{code}
[REPRO] LCASE 'alice' hit = false (expect true under LCASE; observed
false = BUG)
[REPRO-baseline] same-string Alice/Alice hit = true (expect true)
{code}
The repro path mirrors what {{InjectRuntimeFilter}} injects, so the same silent
FN reaches any {{JOIN ON t1.s = t2.s}} where {{s}} is non-binary collation and
{{spark.sql.optimizer.runtime.bloomFilter.enabled = true}}.
h3. Impact
- User-facing: {{bloom_filter_agg(s COLLATE UTF8_LCASE)}} +
{{might_contain(...)}}
returns false for collation-equal probes → silent wrong result.
- Optimizer: {{spark.sql.optimizer.runtime.bloomFilter.enabled}} (on by default
in many distributions) plus non-binary collation join keys → join silently
drops collation-equal rows. The user sees a smaller result set than the
semantics promise, with no error or warning.
h3. Suggested remediation
Three options. We recommend (1) as it preserves the feature for the dominant
in-query use case and adds an explicit cross-version caveat for the rare
persisted case.
1. *Collation-aware hashing.* Replace {{XxHash64(stringExpr)}} build/probe
with {{XxHash64}} over {{CollationFactory.getCollationKey(collationId, str)}}
bytes. Query-internal BF (the dominant case — runtime BF, single-query
{{bloom_filter_agg}}) is fully correct because build and probe share one
JVM / one icu4j version. Cost is a single {{Collator.getCollationKey}} per
probe (cached {{Collator}} instance per {{collationId}}).
Caveat: BF results that are persisted (table write, RDD checkpoint, Connect
cross-version client/server) become invalid across icu4j upgrades. icu4j
75 → 76 altered 23/33 cells in a cross-version probe (`en_US 'a': 0x2a →
0x2b`), 77.1 → 78.3 altered 4/33 (see PR #56096
{{ICUCollationSortKeyGoldenSuite}} for the cross-version visibility golden).
Recommend documenting {{bloom_filter_agg}} output as non-cross-version-
stable, similar to the existing {{Collation*Benchmark*-results.txt}}
convention.
2. *Reject non-binary StringType* in
{{BloomFilterAggregate.checkInputDataTypes}}
and skip injection in {{InjectRuntimeFilter}} for non-binary string join
keys.
Typed error ({{BLOOM_FILTER_NON_BINARY_COLLATION_NOT_SUPPORTED}} or similar).
Safe and minimal; removes a valid in-query use case. Fallback if (1) is
rejected on cost grounds.
3. *Document-only* (recommend {{STRING COLLATE UTF8_BINARY}} for BF keys).
Inadequate; the bug has been silent for ~2 years and added documentation
will not surface it to users hit via default-on {{runtime.bloomFilter}}.
h3. Why this is a correctness bug (not a perf / known-limit)
The BloomFilter contract is "no false negatives". The current behavior
provides silent false negatives under a feature (non-binary collation) that
the type system explicitly accepts. There is no error, warning, or
documented caveat. Users querying a SQL surface that says "collated string
equality is supported" receive results inconsistent with that contract.
h3. References
- Commit {{861cca3da4c}} — SPARK-46832 {{[SQL] Introducing Collate and
Collation expressions}}
-
{{sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala:81,159}}
-
{{sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:64,73}}
-
{{sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CollationFactory.java}}
— {{getCollationKey(id, str)}} is the canonical sort-key API
- PR apache/spark#56096 — {{ICUCollationSortKeyGoldenSuite}} provides the
cross-version drift evidence cited in the caveat above
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]