Kent Yao created SPARK-57055:
--------------------------------

             Summary: `bloom_filter_agg` and `InjectRuntimeFilter` produce 
silent false negatives on non-binary collation StringType
                 Key: SPARK-57055
                 URL: https://issues.apache.org/jira/browse/SPARK-57055
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 4.1.0, 5.0.0, 4.0.0
            Reporter: Kent Yao


h3. Summary

{{bloom_filter_agg}} + {{might_contain}} and the runtime bloom filter injected
by {{InjectRuntimeFilter}} both hash raw UTF8 bytes via 
{{XxHash64(stringExpr)}},
ignoring {{StringType.collationId}}. For any non-binary collation (UTF8_LCASE,
UNICODE_CI, all ICU-backed accent/case-insensitive collations), strings that
are semantically equal under the collation but byte-different (e.g. {{'Alice'}}
vs {{'alice'}} under UTF8_LCASE) hash to different long values and miss the
bloom. This violates BloomFilter's "no false negative" contract and silently
drops matching rows in BF-filtered joins.

h3. Origin

SPARK-46832 (commit {{861cca3da4c}}, Feb 2024) flipped the type pattern in
{{BloomFilterAggregate.scala}} to accept any-collation StringType but kept the
collation-blind {{BinaryUpdater}}:

{code:scala}
- case (LongType | IntegerType | ShortType | ByteType | StringType, LongType, 
LongType) =>
+ case (LongType | IntegerType | ShortType | ByteType | _: StringType, 
LongType, LongType) =>
- case StringType => BinaryUpdater
+ case _: StringType => BinaryUpdater
{code}

{{InjectRuntimeFilter.scala:64,73}} builds and probes with
{{new XxHash64(Seq(filterCreationSideKey/filterApplicationSideKey))}} — no
collation-aware hashing.

h3. Reproducer

Drop into {{sql/core/src/test/scala/org/apache/spark/sql/}}:

{code:scala}
package org.apache.spark.sql

import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions.{BloomFilterMightContain, 
Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.test.SharedSparkSession

class BloomFilterCollationReproSuite extends QueryTest with SharedSparkSession {
  private def sessionFuncId(n: String) =
    FunctionIdentifier(n, Some(CatalogManager.SESSION_NAMESPACE),
      Some(CatalogManager.SYSTEM_CATALOG_NAME))

  override def beforeAll(): Unit = {
    super.beforeAll()
    spark.sessionState.functionRegistry.registerFunction(
      sessionFuncId("bloom_filter_agg"),
      new ExpressionInfo(classOf[BloomFilterAggregate].getName, 
"bloom_filter_agg"),
      (cs: Seq[Expression]) => cs.size match {
        case 1 => new BloomFilterAggregate(cs.head)
        case 2 => new BloomFilterAggregate(cs.head, cs(1))
        case 3 => new BloomFilterAggregate(cs.head, cs(1), cs(2))
      })
    spark.sessionState.functionRegistry.registerFunction(
      sessionFuncId("might_contain"),
      new ExpressionInfo(classOf[BloomFilterMightContain].getName, 
"might_contain"),
      (cs: Seq[Expression]) => BloomFilterMightContain(cs.head, cs(1)))
  }

  test("LCASE silent FN: xxhash64(Alice) build vs xxhash64(alice) probe") {
    val r = spark.sql(
      """SELECT might_contain(
        |  (SELECT bloom_filter_agg(xxhash64(name))
        |     FROM VALUES ('Alice' COLLATE UTF8_LCASE) AS t(name)),
        |  xxhash64(cast('alice' COLLATE UTF8_LCASE as string)))""".stripMargin)
      .collect()(0).getBoolean(0)
    println(s"[REPRO] LCASE 'alice' hit = $r (expect true under LCASE; observed 
false = BUG)")
  }

  test("LCASE baseline: same string Alice/Alice must hit") {
    val r = spark.sql(
      """SELECT might_contain(
        |  (SELECT bloom_filter_agg(xxhash64(name))
        |     FROM VALUES ('Alice' COLLATE UTF8_LCASE) AS t(name)),
        |  xxhash64(cast('Alice' COLLATE UTF8_LCASE as string)))""".stripMargin)
      .collect()(0).getBoolean(0)
    println(s"[REPRO-baseline] same-string Alice/Alice hit = $r (expect true)")
  }
}
{code}

Run: {{build/sbt 'sql/testOnly 
org.apache.spark.sql.BloomFilterCollationReproSuite'}}

Observed output (on master {{29fdcefb8f2}}, 5.0.0-SNAPSHOT, 2026-05-25):

{code}
[REPRO]          LCASE 'alice' hit = false  (expect true under LCASE; observed 
false = BUG)
[REPRO-baseline] same-string Alice/Alice hit = true  (expect true)
{code}

The repro path mirrors what {{InjectRuntimeFilter}} injects, so the same silent
FN reaches any {{JOIN ON t1.s = t2.s}} where {{s}} is non-binary collation and
{{spark.sql.optimizer.runtime.bloomFilter.enabled = true}}.

h3. Impact

- User-facing: {{bloom_filter_agg(s COLLATE UTF8_LCASE)}} + 
{{might_contain(...)}}
  returns false for collation-equal probes → silent wrong result.
- Optimizer: {{spark.sql.optimizer.runtime.bloomFilter.enabled}} (on by default
  in many distributions) plus non-binary collation join keys → join silently
  drops collation-equal rows. The user sees a smaller result set than the
  semantics promise, with no error or warning.

h3. Suggested remediation

Three options. We recommend (1) as it preserves the feature for the dominant
in-query use case and adds an explicit cross-version caveat for the rare
persisted case.

1. *Collation-aware hashing.* Replace {{XxHash64(stringExpr)}} build/probe
   with {{XxHash64}} over {{CollationFactory.getCollationKey(collationId, str)}}
   bytes. Query-internal BF (the dominant case — runtime BF, single-query
   {{bloom_filter_agg}}) is fully correct because build and probe share one
   JVM / one icu4j version. Cost is a single {{Collator.getCollationKey}} per
   probe (cached {{Collator}} instance per {{collationId}}).

   Caveat: BF results that are persisted (table write, RDD checkpoint, Connect
   cross-version client/server) become invalid across icu4j upgrades. icu4j
   75 → 76 altered 23/33 cells in a cross-version probe (`en_US 'a': 0x2a →
   0x2b`), 77.1 → 78.3 altered 4/33 (see PR #56096
   {{ICUCollationSortKeyGoldenSuite}} for the cross-version visibility golden).
   Recommend documenting {{bloom_filter_agg}} output as non-cross-version-
   stable, similar to the existing {{Collation*Benchmark*-results.txt}}
   convention.

2. *Reject non-binary StringType* in 
{{BloomFilterAggregate.checkInputDataTypes}}
   and skip injection in {{InjectRuntimeFilter}} for non-binary string join 
keys.
   Typed error ({{BLOOM_FILTER_NON_BINARY_COLLATION_NOT_SUPPORTED}} or similar).
   Safe and minimal; removes a valid in-query use case. Fallback if (1) is
   rejected on cost grounds.

3. *Document-only* (recommend {{STRING COLLATE UTF8_BINARY}} for BF keys).
   Inadequate; the bug has been silent for ~2 years and added documentation
   will not surface it to users hit via default-on {{runtime.bloomFilter}}.

h3. Why this is a correctness bug (not a perf / known-limit)

The BloomFilter contract is "no false negatives". The current behavior
provides silent false negatives under a feature (non-binary collation) that
the type system explicitly accepts. There is no error, warning, or
documented caveat. Users querying a SQL surface that says "collated string
equality is supported" receive results inconsistent with that contract.

h3. References

- Commit {{861cca3da4c}} — SPARK-46832 {{[SQL] Introducing Collate and 
Collation expressions}}
- 
{{sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala:81,159}}
- 
{{sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:64,73}}
- 
{{sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CollationFactory.java}}
 — {{getCollationKey(id, str)}} is the canonical sort-key API
- PR apache/spark#56096 — {{ICUCollationSortKeyGoldenSuite}} provides the
  cross-version drift evidence cited in the caveat above



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to