[ 
https://issues.apache.org/jira/browse/SPARK-43908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-43908:
-----------------------------------
    Labels: pull-request-available  (was: )

> use bigger rowCount to initialize BloomFilterAggregate in InjectRuntimeFilter
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-43908
>                 URL: https://issues.apache.org/jira/browse/SPARK-43908
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.4.0
>            Reporter: Yann Byron
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2023-06-01-13-13-21-168.png, 
> image-2023-06-01-13-14-07-265.png
>
>
> Current, alway choose the row count of `filterCreationSidePlan` to initialize 
> BloomFilterAggregate In `InjectRuntimeFilter` Rule. See below:
> {code:java}
> private def injectBloomFilter(
>     filterApplicationSideExp: Expression,
>     filterApplicationSidePlan: LogicalPlan,
>     filterCreationSideExp: Expression,
>     filterCreationSidePlan: LogicalPlan): LogicalPlan = {
>   // Skip if the filter creation side is too big
>   if (filterCreationSidePlan.stats.sizeInBytes > 
> conf.runtimeFilterCreationSideThreshold) {
>     return filterApplicationSidePlan
>   }
>   val rowCount = filterCreationSidePlan.stats.rowCount
>   val bloomFilterAgg =
>     if (rowCount.isDefined && rowCount.get.longValue > 0L) {
>       new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)),
>         Literal(rowCount.get.longValue))
>     } else {
>       new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)))
>     }
>   val aggExp = AggregateExpression(bloomFilterAgg, Complete, isDistinct = 
> false, None)
>   val alias = Alias(aggExp, "bloomFilter")()
>   val aggregate =
>     ConstantFolding(ColumnPruning(Aggregate(Nil, Seq(alias), 
> filterCreationSidePlan)))
>   val bloomFilterSubquery = ScalarSubquery(aggregate, Nil)
>   val filter = BloomFilterMightContain(bloomFilterSubquery,
>     new XxHash64(Seq(filterApplicationSideExp)))
>   Filter(filter, filterApplicationSidePlan)
>  {code}
> That will cause a poor filtering effect when there is a big gap in the number 
> of rows between `filterApplicationSidePlan`  and `filterCreationSidePlan`. 
> That's because a smaller number of rows will use a smaller bit array in 
> BloomFilter, and the FPP increases as it is applied to a wider range.
> Here is a comparison in Query40, 10T tpcds dataset.
> SQL is:
> {code:java}
> -- start query 40 in stream 0 using template query40.tpl
> select  
>    w_state
>   ,i_item_id
>   ,sum(case when (cast(d_date as date) < cast ('2000-03-11' as date)) 
>          then cs_sales_price - coalesce(cr_refunded_cash,0) else 0 end) as 
> sales_before
>   ,sum(case when (cast(d_date as date) >= cast ('2000-03-11' as date)) 
>          then cs_sales_price - coalesce(cr_refunded_cash,0) else 0 end) as 
> sales_after
>  from
>    catalog_sales left outer join catalog_returns on
>        (cs_order_number = cr_order_number 
>         and cs_item_sk = cr_item_sk)
>   ,warehouse 
>   ,item
>   ,date_dim
>  where
>      i_current_price between 0.99 and 1.49
>  and i_item_sk          = cs_item_sk
>  and cs_warehouse_sk    = w_warehouse_sk 
>  and cs_sold_date_sk    = d_date_sk
>  and d_date between (cast ('2000-03-11' as date) - interval 30 days)
>                 and (cast ('2000-03-11' as date) + interval 30 days) 
>  group by
>     w_state,i_item_id
>  order by w_state,i_item_id
> limit 100;-- end query 40 in stream 0 using template query40.tpl
>  {code}
> Spark UI: 
> !image-2023-06-01-13-14-07-265.png|width=314,height=315!
> !image-2023-06-01-13-13-21-168.png|width=318,height=317!
> the `subquery` in both of graphs is from the `item` table. The front one has 
> statistics and uses `rowCount` to initialize BloomFilterAgg, but the latter 
> uses the default configuration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to