[
https://issues.apache.org/jira/browse/SPARK-32628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17179506#comment-17179506
]
Yuming Wang edited comment on SPARK-32628 at 8/18/20, 11:48 PM:
----------------------------------------------------------------
{code:scala}
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.util.sketch.BloomFilter
import org.apache.spark.sql.catalyst.expressions.aggregate.BuildBloomFilter
import org.apache.spark.sql.Column
val N = 1000000000L
val path = "/tmp/spark/bloomfilter"
spark.range(N).write.mode("overwrite").parquet(path)
val benchmark = new Benchmark(s"Benchmark build bloom filter",
valuesPerIteration = N, minNumIters = 2)
Seq(1000, 100000, 10000000, 30000000, 50000000, 70000000, 100000000L,
200000000L).foreach { items =>
val df = spark.read.parquet(path).filter(s"id < ${items}")
benchmark.addCase(s"Build bloom filter using UDAF with ${items} items") { _ =>
df.select(new Column(BuildBloomFilter(col("id").expr,
items).toAggregateExpression())).collect()
}
if (items < 100000000L) {
benchmark.addCase(s"Collect ${items} items and build bloom filter") { _ =>
val br = BloomFilter.create(items)
df.collect().foreach(r => br.putLong(r.getLong(0)))
}
}
}
benchmark.run()
{code}
bin/spark-shell --conf spark.driver.maxResultSize=8G --conf
spark.driver.memory= 16G --conf spark.executor.memory=15G --conf
spark.executor.cores=4 --conf spark.executor.instances=10
Benchmark result:
{noformat}
Java HotSpot(TM) 64-Bit Server VM 1.8.0_121-b13 on Linux
3.10.0-957.10.1.el7.x86_64
Intel Core Processor (Broadwell)
Benchmark build bloom filter: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
----------------------------------------------------------------------------------------------------------------------------------
Build bloom filter using UDAF with 1000 items 842
1511 945 1187.0 0.8 2.0X
Collect 1000 items and build bloom filter 640
818 190 1561.5 0.6 2.6X
Build bloom filter using UDAF with 100000 items 1086
1350 373 921.0 1.1 1.6X
Collect 100000 items and build bloom filter 810
1296 688 1235.0 0.8 2.1X
Build bloom filter using UDAF with 10000000 items 6265
6403 196 159.6 6.3 0.3X
Collect 10000000 items and build bloom filter 11457
12787 1881 87.3 11.5 0.1X
Build bloom filter using UDAF with 30000000 items 16001
17154 1631 62.5 16.0 0.1X
Collect 30000000 items and build bloom filter 39385
53707 2091 25.4 39.4 0.0X
Build bloom filter using UDAF with 50000000 items 27009
27375 518 37.0 27.0 0.1X
Collect 50000000 items and build bloom filter 109192
111757 NaN 9.2 109.2 0.0X
Build bloom filter using UDAF with 70000000 items 26540
32454 NaN 37.7 26.5 0.1X
Collect 70000000 items and build bloom filter 111459
141487 NaN 9.0 111.5 0.0X
Build bloom filter using UDAF with 100000000 items 27460
28696 1748 36.4 27.5 0.1X
Build bloom filter using UDAF with 200000000 items 46415
47788 1941 21.5 46.4 0.0X
{noformat}
was (Author: q79969786):
{code:scala}
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.util.sketch.BloomFilter
import org.apache.spark.sql.catalyst.expressions.aggregate.BuildBloomFilter
import org.apache.spark.sql.Column
val N = 1000000000L
val path = "/tmp/spark/bloomfilter"
spark.range(N).write.mode("overwrite").parquet(path)
val benchmark = new Benchmark(s"Benchmark build bloom filter",
valuesPerIteration = N, minNumIters = 2)
Seq(1000, 100000, 10000000, 30000000, 50000000, 70000000, 100000000L,
200000000L).foreach { items =>
val df = spark.read.parquet(path).filter(s"id < ${items}")
benchmark.addCase(s"Build bloom filter using UDAF with ${items} items") { _ =>
// Sum(col("id").expr).toAggregateExpression().as("sum")
df.select(new Column(BuildBloomFilter(col("id").expr,
items).toAggregateExpression())).collect()
}
if (items < 100000000L) {
benchmark.addCase(s"Collect ${items} items and build bloom filter") { _ =>
val br = BloomFilter.create(items)
df.collect().foreach(r => br.putLong(r.getLong(0)))
}
}
}
benchmark.run()
{code}
bin/spark-shell --conf spark.driver.maxResultSize=8G --conf
spark.driver.memory= 16G --conf spark.executor.memory=15G --conf
spark.executor.cores=4 --conf spark.executor.instances=10
Benchmark result:
{noformat}
Java HotSpot(TM) 64-Bit Server VM 1.8.0_121-b13 on Linux
3.10.0-957.10.1.el7.x86_64
Intel Core Processor (Broadwell)
Benchmark build bloom filter: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
----------------------------------------------------------------------------------------------------------------------------------
Build bloom filter using UDAF with 1000 items 842
1511 945 1187.0 0.8 2.0X
Collect 1000 items and build bloom filter 640
818 190 1561.5 0.6 2.6X
Build bloom filter using UDAF with 100000 items 1086
1350 373 921.0 1.1 1.6X
Collect 100000 items and build bloom filter 810
1296 688 1235.0 0.8 2.1X
Build bloom filter using UDAF with 10000000 items 6265
6403 196 159.6 6.3 0.3X
Collect 10000000 items and build bloom filter 11457
12787 1881 87.3 11.5 0.1X
Build bloom filter using UDAF with 30000000 items 16001
17154 1631 62.5 16.0 0.1X
Collect 30000000 items and build bloom filter 39385
53707 2091 25.4 39.4 0.0X
Build bloom filter using UDAF with 50000000 items 27009
27375 518 37.0 27.0 0.1X
Collect 50000000 items and build bloom filter 109192
111757 NaN 9.2 109.2 0.0X
Build bloom filter using UDAF with 70000000 items 26540
32454 NaN 37.7 26.5 0.1X
Collect 70000000 items and build bloom filter 111459
141487 NaN 9.0 111.5 0.0X
Build bloom filter using UDAF with 100000000 items 27460
28696 1748 36.4 27.5 0.1X
Build bloom filter using UDAF with 200000000 items 46415
47788 1941 21.5 46.4 0.0X
{noformat}
> Use bloom filter to improve dynamicPartitionPruning
> ---------------------------------------------------
>
> Key: SPARK-32628
> URL: https://issues.apache.org/jira/browse/SPARK-32628
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.1.0
> Reporter: Yuming Wang
> Priority: Major
>
> It will throw exception whenÂ
> {{spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly}} is
> disabled:
> {code:sql}
> select catalog_sales.* from catalog_sales join catalog_returns where
> cr_order_number = cs_sold_date_sk and cr_returned_time_sk < 40000;
> {code}
> {noformat}
> 20/08/16 06:44:42 ERROR TaskSetManager: Total size of serialized results of
> 494 tasks (1225.3 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
> {noformat}
> We can improve it with minimum, maximum and Bloom filter to reduce serialized
> results.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]