[ 
https://issues.apache.org/jira/browse/SPARK-32628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17179506#comment-17179506
 ] 

Yuming Wang edited comment on SPARK-32628 at 8/18/20, 11:48 PM:
----------------------------------------------------------------

{code:scala}
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.util.sketch.BloomFilter
import org.apache.spark.sql.catalyst.expressions.aggregate.BuildBloomFilter
import org.apache.spark.sql.Column

val N = 1000000000L
val path = "/tmp/spark/bloomfilter"
spark.range(N).write.mode("overwrite").parquet(path)

val benchmark = new Benchmark(s"Benchmark build bloom filter", 
valuesPerIteration = N, minNumIters = 2)

Seq(1000, 100000, 10000000, 30000000, 50000000, 70000000, 100000000L, 
200000000L).foreach { items =>
  val df = spark.read.parquet(path).filter(s"id < ${items}")
  benchmark.addCase(s"Build bloom filter using UDAF with ${items} items") { _ =>
    df.select(new Column(BuildBloomFilter(col("id").expr, 
items).toAggregateExpression())).collect()
  }

  if (items < 100000000L) {
    benchmark.addCase(s"Collect ${items} items and build bloom filter") { _ =>
      val br = BloomFilter.create(items)
      df.collect().foreach(r => br.putLong(r.getLong(0)))
    }
  }
}
benchmark.run()
{code}
bin/spark-shell --conf spark.driver.maxResultSize=8G --conf 
spark.driver.memory= 16G --conf spark.executor.memory=15G --conf 
spark.executor.cores=4 --conf spark.executor.instances=10
 Benchmark result:
{noformat}
Java HotSpot(TM) 64-Bit Server VM 1.8.0_121-b13 on Linux 
3.10.0-957.10.1.el7.x86_64
Intel Core Processor (Broadwell)
Benchmark build bloom filter:                       Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
----------------------------------------------------------------------------------------------------------------------------------
Build bloom filter using UDAF with 1000 items                 842           
1511         945       1187.0           0.8       2.0X
Collect 1000 items and build bloom filter                     640            
818         190       1561.5           0.6       2.6X
Build bloom filter using UDAF with 100000 items              1086           
1350         373        921.0           1.1       1.6X
Collect 100000 items and build bloom filter                   810           
1296         688       1235.0           0.8       2.1X
Build bloom filter using UDAF with 10000000 items            6265           
6403         196        159.6           6.3       0.3X
Collect 10000000 items and build bloom filter               11457          
12787        1881         87.3          11.5       0.1X
Build bloom filter using UDAF with 30000000 items           16001          
17154        1631         62.5          16.0       0.1X
Collect 30000000 items and build bloom filter               39385          
53707        2091         25.4          39.4       0.0X
Build bloom filter using UDAF with 50000000 items           27009          
27375         518         37.0          27.0       0.1X
Collect 50000000 items and build bloom filter              109192         
111757         NaN          9.2         109.2       0.0X
Build bloom filter using UDAF with 70000000 items           26540          
32454         NaN         37.7          26.5       0.1X
Collect 70000000 items and build bloom filter              111459         
141487         NaN          9.0         111.5       0.0X
Build bloom filter using UDAF with 100000000 items          27460          
28696        1748         36.4          27.5       0.1X
Build bloom filter using UDAF with 200000000 items          46415          
47788        1941         21.5          46.4       0.0X
{noformat}


was (Author: q79969786):

{code:scala}
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.util.sketch.BloomFilter
import org.apache.spark.sql.catalyst.expressions.aggregate.BuildBloomFilter
import org.apache.spark.sql.Column

val N = 1000000000L
val path = "/tmp/spark/bloomfilter"
spark.range(N).write.mode("overwrite").parquet(path)

val benchmark = new Benchmark(s"Benchmark build bloom filter", 
valuesPerIteration = N, minNumIters = 2)

Seq(1000, 100000, 10000000, 30000000, 50000000, 70000000, 100000000L, 
200000000L).foreach { items =>
  val df = spark.read.parquet(path).filter(s"id < ${items}")
  benchmark.addCase(s"Build bloom filter using UDAF with ${items} items") { _ =>
    // Sum(col("id").expr).toAggregateExpression().as("sum")
    df.select(new Column(BuildBloomFilter(col("id").expr, 
items).toAggregateExpression())).collect()
  }

  if (items < 100000000L) {
    benchmark.addCase(s"Collect ${items} items and build bloom filter") { _ =>
      val br = BloomFilter.create(items)
      df.collect().foreach(r => br.putLong(r.getLong(0)))
    }
  }
}
benchmark.run()
{code}
bin/spark-shell --conf spark.driver.maxResultSize=8G --conf 
spark.driver.memory= 16G  --conf spark.executor.memory=15G --conf 
spark.executor.cores=4 --conf spark.executor.instances=10
Benchmark result:
{noformat}
Java HotSpot(TM) 64-Bit Server VM 1.8.0_121-b13 on Linux 
3.10.0-957.10.1.el7.x86_64
Intel Core Processor (Broadwell)
Benchmark build bloom filter:                       Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
----------------------------------------------------------------------------------------------------------------------------------
Build bloom filter using UDAF with 1000 items                 842           
1511         945       1187.0           0.8       2.0X
Collect 1000 items and build bloom filter                     640            
818         190       1561.5           0.6       2.6X
Build bloom filter using UDAF with 100000 items              1086           
1350         373        921.0           1.1       1.6X
Collect 100000 items and build bloom filter                   810           
1296         688       1235.0           0.8       2.1X
Build bloom filter using UDAF with 10000000 items            6265           
6403         196        159.6           6.3       0.3X
Collect 10000000 items and build bloom filter               11457          
12787        1881         87.3          11.5       0.1X
Build bloom filter using UDAF with 30000000 items           16001          
17154        1631         62.5          16.0       0.1X
Collect 30000000 items and build bloom filter               39385          
53707        2091         25.4          39.4       0.0X
Build bloom filter using UDAF with 50000000 items           27009          
27375         518         37.0          27.0       0.1X
Collect 50000000 items and build bloom filter              109192         
111757         NaN          9.2         109.2       0.0X
Build bloom filter using UDAF with 70000000 items           26540          
32454         NaN         37.7          26.5       0.1X
Collect 70000000 items and build bloom filter              111459         
141487         NaN          9.0         111.5       0.0X
Build bloom filter using UDAF with 100000000 items          27460          
28696        1748         36.4          27.5       0.1X
Build bloom filter using UDAF with 200000000 items          46415          
47788        1941         21.5          46.4       0.0X
{noformat}



> Use bloom filter to improve dynamicPartitionPruning
> ---------------------------------------------------
>
>                 Key: SPARK-32628
>                 URL: https://issues.apache.org/jira/browse/SPARK-32628
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.1.0
>            Reporter: Yuming Wang
>            Priority: Major
>
> It will throw exception when 
> {{spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly}} is 
> disabled:
> {code:sql}
> select catalog_sales.* from  catalog_sales join catalog_returns  where 
> cr_order_number = cs_sold_date_sk and cr_returned_time_sk < 40000;
> {code}
> {noformat}
> 20/08/16 06:44:42 ERROR TaskSetManager: Total size of serialized results of 
> 494 tasks (1225.3 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
> {noformat}
> We can improve it with minimum, maximum and Bloom filter to reduce serialized 
> results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to