brijrajk commented on PR #12151:
URL: https://github.com/apache/gluten/pull/12151#issuecomment-4687266314

   @philo-he You are absolutely right. We confirmed it with a test case.
   
   **How threshold and cost work**
   
   `ExpandFallbackPolicy` counts the number of columnar↔row conversion 
boundaries inside a stage. If that count (cost) meets 
`COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD`, the entire stage is wrapped in a 
`FallbackNode` and runs as plain Spark.
   
   | Scenario | Threshold | Stage 0 cost | Stage 1 cost | Outcome |
   |---|---|---|---|---|
   | Original fix (PR as-is) | 2 | 1 → native ✓ | 2 → whole-stage fallback | 
Stage 0 Velox bytes, Stage 1 JVM — **patcher correct** |
   | Your scenario | 1 | 1 → whole-stage fallback | ≥ 1 → whole-stage fallback 
| Stage 0 **Spark bytes**, Stage 1 JVM — **patcher misfires** |
   
   **Test case confirming the failure**
   
   ```scala
   testGluten(
     "Test bloom_filter_agg whole-stage fallback when both stages fall back",
     Issue12013) {
     ...
     if 
(BackendsApiManager.getSettings.requireBloomFilterAggMightContainJointFallback())
 {
       // threshold=1: Stage 0's inherent transition cost of 1 meets the 
threshold, so
       // ExpandFallbackPolicy promotes Stage 0 to a whole-stage fallback as 
well.
       // Stage 0 runs as Spark and produces Spark-format bytes. Stage 1 also 
falls back.
       // The patcher must NOT rewrite BloomFilterMightContain -> 
VeloxBloomFilterMightContain
       // in this case.
       withSQLConf(
         GlutenConfig.COLUMNAR_FILTER_ENABLED.key -> "false",
         GlutenConfig.COLUMNAR_WHOLESTAGE_FALLBACK_THRESHOLD.key -> "1",
         SQLConf.ANSI_ENABLED.key -> "false"
       ) {
         val df = spark.sql(sqlString)
         assert(df.collect().length == 200003L)
       }
     }
   }
   ```
   
   **Output**
   
   ```
   - Gluten - Test bloom_filter_agg whole-stage fallback when both stages fall 
back *** FAILED ***
     org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 7.0 failed 1 times,
     most recent failure: Lost task 0.0 in stage 7.0: 
org.apache.gluten.exception.GlutenException:
     Exception: VeloxUserError
     Error Source: USER
     Error Code: INVALID_ARGUMENT
     Reason: (1 vs. 0)
     Retriable: False
     Expression: kBloomFilterV1 == version
     Function: mayContain
     File: velox/common/base/BloomFilter.h
     Line: 70
   
       at 
org.apache.gluten.utils.VeloxBloomFilterJniWrapper.mightContainLongOnSerializedBloom(Native
 Method)
       at 
org.apache.gluten.utils.VeloxBloomFilter.mightContainLongOnSerializedBloom(VeloxBloomFilter.java:163)
       ...
   
   Tests: succeeded 1, failed 1
   ```
   
   `kBloomFilterV1 == version` failing with `(1 vs. 0)` is the exact 
version-byte mismatch: Velox's reader expected its own format (`1`) but got 
Spark's format (`0`).
   
   **Proposed fix**
   
   The root cause is that `enableNativeBloomFilter` answers "is native bloom 
filter on in config?" but the right question is "did Stage 0 actually run 
natively?" The fix is to make the guard structural: inside 
`patchBloomFilterMightContain`, before rewriting, inspect the physical plan 
referenced by `bloomFilterExpression`. If Stage 0's plan is itself a 
`FallbackNode`, it will produce Spark-format bytes and Stage 1 must be left 
with the vanilla `BloomFilterMightContain`.
   
   Do you see any concerns with this approach, or is there a cleaner way you 
would handle it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to