[
https://issues.apache.org/jira/browse/SPARK-43908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated SPARK-43908:
-----------------------------------
Labels: pull-request-available (was: )
> use bigger rowCount to initialize BloomFilterAggregate in InjectRuntimeFilter
> -----------------------------------------------------------------------------
>
> Key: SPARK-43908
> URL: https://issues.apache.org/jira/browse/SPARK-43908
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.4.0
> Reporter: Yann Byron
> Priority: Major
> Labels: pull-request-available
> Attachments: image-2023-06-01-13-13-21-168.png,
> image-2023-06-01-13-14-07-265.png
>
>
> Current, alway choose the row count of `filterCreationSidePlan` to initialize
> BloomFilterAggregate In `InjectRuntimeFilter` Rule. See below:
> {code:java}
> private def injectBloomFilter(
> filterApplicationSideExp: Expression,
> filterApplicationSidePlan: LogicalPlan,
> filterCreationSideExp: Expression,
> filterCreationSidePlan: LogicalPlan): LogicalPlan = {
> // Skip if the filter creation side is too big
> if (filterCreationSidePlan.stats.sizeInBytes >
> conf.runtimeFilterCreationSideThreshold) {
> return filterApplicationSidePlan
> }
> val rowCount = filterCreationSidePlan.stats.rowCount
> val bloomFilterAgg =
> if (rowCount.isDefined && rowCount.get.longValue > 0L) {
> new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)),
> Literal(rowCount.get.longValue))
> } else {
> new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)))
> }
> val aggExp = AggregateExpression(bloomFilterAgg, Complete, isDistinct =
> false, None)
> val alias = Alias(aggExp, "bloomFilter")()
> val aggregate =
> ConstantFolding(ColumnPruning(Aggregate(Nil, Seq(alias),
> filterCreationSidePlan)))
> val bloomFilterSubquery = ScalarSubquery(aggregate, Nil)
> val filter = BloomFilterMightContain(bloomFilterSubquery,
> new XxHash64(Seq(filterApplicationSideExp)))
> Filter(filter, filterApplicationSidePlan)
> {code}
> That will cause a poor filtering effect when there is a big gap in the number
> of rows between `filterApplicationSidePlan` and `filterCreationSidePlan`.
> That's because a smaller number of rows will use a smaller bit array in
> BloomFilter, and the FPP increases as it is applied to a wider range.
> Here is a comparison in Query40, 10T tpcds dataset.
> SQL is:
> {code:java}
> -- start query 40 in stream 0 using template query40.tpl
> select
> w_state
> ,i_item_id
> ,sum(case when (cast(d_date as date) < cast ('2000-03-11' as date))
> then cs_sales_price - coalesce(cr_refunded_cash,0) else 0 end) as
> sales_before
> ,sum(case when (cast(d_date as date) >= cast ('2000-03-11' as date))
> then cs_sales_price - coalesce(cr_refunded_cash,0) else 0 end) as
> sales_after
> from
> catalog_sales left outer join catalog_returns on
> (cs_order_number = cr_order_number
> and cs_item_sk = cr_item_sk)
> ,warehouse
> ,item
> ,date_dim
> where
> i_current_price between 0.99 and 1.49
> and i_item_sk = cs_item_sk
> and cs_warehouse_sk = w_warehouse_sk
> and cs_sold_date_sk = d_date_sk
> and d_date between (cast ('2000-03-11' as date) - interval 30 days)
> and (cast ('2000-03-11' as date) + interval 30 days)
> group by
> w_state,i_item_id
> order by w_state,i_item_id
> limit 100;-- end query 40 in stream 0 using template query40.tpl
> {code}
> Spark UI:
> !image-2023-06-01-13-14-07-265.png|width=314,height=315!
> !image-2023-06-01-13-13-21-168.png|width=318,height=317!
> the `subquery` in both of graphs is from the `item` table. The front one has
> statistics and uses `rowCount` to initialize BloomFilterAgg, but the latter
> uses the default configuration.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]