brijrajk commented on PR #12151:
URL: https://github.com/apache/gluten/pull/12151#issuecomment-4687266314
@philo-he You are absolutely right. We confirmed it with a test case.
**How threshold and cost work**
`ExpandFallbackPolicy` counts the number of columnar↔row conversion
boundaries inside a stage. If that count (cost) meets
`COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD`, the entire stage is wrapped in a
`FallbackNode` and runs as plain Spark.
| Scenario | Threshold | Stage 0 cost | Stage 1 cost | Outcome |
|---|---|---|---|---|
| Original fix (PR as-is) | 2 | 1 → native ✓ | 2 → whole-stage fallback |
Stage 0 Velox bytes, Stage 1 JVM — **patcher correct** |
| Your scenario | 1 | 1 → whole-stage fallback | ≥ 1 → whole-stage fallback
| Stage 0 **Spark bytes**, Stage 1 JVM — **patcher misfires** |
**Test case confirming the failure**
```scala
testGluten(
"Test bloom_filter_agg whole-stage fallback when both stages fall back",
Issue12013) {
...
if
(BackendsApiManager.getSettings.requireBloomFilterAggMightContainJointFallback())
{
// threshold=1: Stage 0's inherent transition cost of 1 meets the
threshold, so
// ExpandFallbackPolicy promotes Stage 0 to a whole-stage fallback as
well.
// Stage 0 runs as Spark and produces Spark-format bytes. Stage 1 also
falls back.
// The patcher must NOT rewrite BloomFilterMightContain ->
VeloxBloomFilterMightContain
// in this case.
withSQLConf(
GlutenConfig.COLUMNAR_FILTER_ENABLED.key -> "false",
GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "1",
SQLConf.ANSI_ENABLED.key -> "false"
) {
val df = spark.sql(sqlString)
assert(df.collect().length == 200003L)
}
}
}
```
**Output**
```
- Gluten - Test bloom_filter_agg whole-stage fallback when both stages fall
back *** FAILED ***
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 7.0 failed 1 times,
most recent failure: Lost task 0.0 in stage 7.0:
org.apache.gluten.exception.GlutenException:
Exception: VeloxUserError
Error Source: USER
Error Code: INVALID_ARGUMENT
Reason: (1 vs. 0)
Retriable: False
Expression: kBloomFilterV1 == version
Function: mayContain
File: velox/common/base/BloomFilter.h
Line: 70
at
org.apache.gluten.utils.VeloxBloomFilterJniWrapper.mightContainLongOnSerializedBloom(Native
Method)
at
org.apache.gluten.utils.VeloxBloomFilter.mightContainLongOnSerializedBloom(VeloxBloomFilter.java:163)
...
Tests: succeeded 1, failed 1
```
`kBloomFilterV1 == version` failing with `(1 vs. 0)` is the exact
version-byte mismatch: Velox's reader expected its own format (`1`) but got
Spark's format (`0`).
**Proposed fix**
The root cause is that `enableNativeBloomFilter` answers "is native bloom
filter on in config?" but the right question is "did Stage 0 actually run
natively?" The fix is to make the guard structural: inside
`patchBloomFilterMightContain`, before rewriting, inspect the physical plan
referenced by `bloomFilterExpression`. If Stage 0's plan is itself a
`FallbackNode`, it will produce Spark-format bytes and Stage 1 must be left
with the vanilla `BloomFilterMightContain`.
Do you see any concerns with this approach, or is there a cleaner way you
would handle it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]