zzc created SPARK-6483:
--------------------------
Summary: Spark SQL udf(ScalaUdf) is very slow
Key: SPARK-6483
URL: https://issues.apache.org/jira/browse/SPARK-6483
Project: Spark
Issue Type: Improvement
Components: SQL
Affects Versions: 1.3.0, 1.4.0
Environment: 1. Spark version is 1.3.0
2. 3 node per 80G/20C
3. read 250G parquet files from hdfs
Reporter: zzc
Test case:
1.
register "floor" func with command:
sqlContext.udf.register("floor", (ts: Int) => ts - ts % 300),
then run with sql "select chan, floor(ts) as tt, sum(size) from qlogbase3 group
by chan, floor(ts)",
*it takes 17 minutes.*
{quote}
== Physical Plan ==
Aggregate false, [chan#23015,PartialGroup#23500],
[chan#23015,PartialGroup#23500 AS tt#23494,CombineSum(PartialSum#23499L) AS
c2#23495L]
Exchange (HashPartitioning [chan#23015,PartialGroup#23500], 54)
Aggregate true, [chan#23015,scalaUDF(ts#23016)],
[chan#23015,*scalaUDF*(ts#23016) AS PartialGroup#23500,SUM(size#23023L) AS
PartialSum#23499L]
PhysicalRDD [chan#23015,ts#23016,size#23023L], MapPartitionsRDD[115] at map
at newParquet.scala:562
{quote}
2.
run with sql "select chan, (ts - ts % 300) as tt, sum(size) from qlogbase3
group by chan, (ts - ts % 300)",
*it takes only 5 minutes.*
{quote}
== Physical Plan ==
Aggregate false, [chan#23015,PartialGroup#23349],
[chan#23015,PartialGroup#23349 AS tt#23343,CombineSum(PartialSum#23348L) AS
c2#23344L]
Exchange (HashPartitioning [chan#23015,PartialGroup#23349], 54)
Aggregate true, [chan#23015,(ts#23016 - (ts#23016 % 300))],
[chan#23015,*(ts#23016 - (ts#23016 % 300))* AS
PartialGroup#23349,SUM(size#23023L) AS PartialSum#23348L]
PhysicalRDD [chan#23015,ts#23016,size#23023L], MapPartitionsRDD[83] at map
at newParquet.scala:562
{quote}
3.
use *HiveContext* with sql "select chan, floor((ts - ts % 300)) as tt,
sum(size) from qlogbase3 group by chan, floor((ts - ts % 300))"
*it takes only 5 minutes too. *
{quote}
== Physical Plan ==
Aggregate false, [chan#23015,PartialGroup#23108L],
[chan#23015,PartialGroup#23108L AS tt#23102L,CombineSum(PartialSum#23107L) AS
_c2#23103L]
Exchange (HashPartitioning [chan#23015,PartialGroup#23108L], 54)
Aggregate true,
[chan#23015,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016
- (ts#23016 % 300)))],
[chan#23015,*HiveGenericUdf*#org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor((ts#23016
- (ts#23016 % 300))) AS PartialGroup#23108L,SUM(size#23023L) AS
PartialSum#23107L]
PhysicalRDD [chan#23015,ts#23016,size#23023L], MapPartitionsRDD[28] at map
at newParquet.scala:562
{quote}
*Why? ScalaUdf is so slow?? How to improve it?*
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]