[
https://issues.apache.org/jira/browse/SPARK-8466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Michael Armbrust updated SPARK-8466:
------------------------------------
Description:
Input Data: a parquet file stored in hdfs://xxxx/data with two columns
(lifeAverageBitrateKbps int, playtimems int)
=================================
Scripts used in spark-shell:
{code}
val df = sqlContext.parquetFile("hdfs://xxxx/data")
import org.apache.spark.sql.types._
val cols = df.schema.fields.map { f =>
val dataType = f.dataType match {
case DoubleType | FloatType => DecimalType.Unlimited
case t => t
}
df.col(f.name).cast(dataType).as(f.name)
}
df.select(cols: _*).registerTempTable("t")
val query =
"""
|SELECT avg(cleanedplaytimems),
| count(1)
|FROM
| (SELECT 0 key,
| avg(lifeAverageBitrateKbps) avgbitrate
| FROM anon_sdm2_ss
| WHERE lifeAverageBitrateKbps > 0) t1,
| (SELECT 0 key,
| lifeAverageBitrateKbps,
| if(playtimems > 0, playtimems, 0) cleanedplaytimems
| FROM anon_sdm2_ss
| WHERE lifeAverageBitrateKbps > 0) t2
|WHERE t1.key=t2.key
| AND t2.lifeAverageBitrateKbps < 0.5 * t1.avgbitrate
""".stripMargin
sqlContext.sql(query).explain(true)
{code}
===========================
Output:
{code}
== Analyzed Logical Plan ==
Aggregate [], [AVG(CAST(cleanedplaytimems#110, LongType)) AS _c0#111,COUNT(1)
AS _c1#112L]
Filter ((key#107 = key#109) && (CAST(lifeAverageBitrateKbps#105, DoubleType) <
(0.5 * avgbitrate#108)))
Join Inner, None
Subquery t1
Aggregate [], [0 AS key#107,AVG(CAST(lifeAverageBitrateKbps#105, LongType))
AS avgbitrate#108]
Filter (lifeAverageBitrateKbps#105 > 0)
Subquery anon_sdm2_ss
Project [CAST(lifeaveragebitratekbps#27, IntegerType) AS
lifeaveragebitratekbps#105,CAST(playtimems#89, IntegerType) AS playtimems#106]
Relation[lifeaveragebitratekbps#27,playtimems#89]
ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)
Subquery t2
Project [0 AS
key#109,lifeAverageBitrateKbps#105,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFIf((playtimems#106
> 0),playtimems#106,0) AS cleanedplaytimems#110]
Filter (lifeAverageBitrateKbps#105 > 0)
Subquery anon_sdm2_ss
Project [CAST(lifeaveragebitratekbps#27, IntegerType) AS
lifeaveragebitratekbps#105,CAST(playtimems#89, IntegerType) AS playtimems#106]
Relation[lifeaveragebitratekbps#27,playtimems#89]
ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)
== Optimized Logical Plan ==
Aggregate [], [AVG(CAST(cleanedplaytimems#110, LongType)) AS _c0#111,COUNT(1)
AS _c1#112L]
Project [cleanedplaytimems#110]
Join Inner, Some(((key#107 = key#109) && (CAST(lifeAverageBitrateKbps#105,
DoubleType) < (0.5 * avgbitrate#108))))
Aggregate [], [0 AS key#107,AVG(CAST(lifeAverageBitrateKbps#105, LongType))
AS avgbitrate#108]
Project [lifeaveragebitratekbps#27 AS lifeaveragebitratekbps#105]
!Filter (lifeAverageBitrateKbps#105 > 0)
Relation[lifeaveragebitratekbps#27,playtimems#89]
ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)
Project [0 AS key#109,lifeaveragebitratekbps#27 AS
lifeaveragebitratekbps#105,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFIf((playtimems#89
AS playtimems#106 > 0),playtimems#89 AS playtimems#106,0) AS
cleanedplaytimems#110]
!Filter (lifeAverageBitrateKbps#105 > 0)
Relation[lifeaveragebitratekbps#27,playtimems#89]
ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)
{code}
Note:
Filter is unresolved
was:
Input Data: a parquet file stored in hdfs://xxxx/data with two columns
(lifeAverageBitrateKbps int, playtimems int)
=================================
Scripts used in spark-shell:
val df = sqlContext.parquetFile("hdfs://xxxx/data")
import org.apache.spark.sql.types._
val cols = df.schema.fields.map { f =>
val dataType = f.dataType match {
case DoubleType | FloatType => DecimalType.Unlimited
case t => t
}
df.col(f.name).cast(dataType).as(f.name)
}
df.select(cols: _*).registerTempTable("t")
val query =
"""
|SELECT avg(cleanedplaytimems),
| count(1)
|FROM
| (SELECT 0 key,
| avg(lifeAverageBitrateKbps) avgbitrate
| FROM anon_sdm2_ss
| WHERE lifeAverageBitrateKbps > 0) t1,
| (SELECT 0 key,
| lifeAverageBitrateKbps,
| if(playtimems > 0, playtimems, 0) cleanedplaytimems
| FROM anon_sdm2_ss
| WHERE lifeAverageBitrateKbps > 0) t2
|WHERE t1.key=t2.key
| AND t2.lifeAverageBitrateKbps < 0.5 * t1.avgbitrate
""".stripMargin
sqlContext.sql(query).explain(true)
===========================
Output:
.....
== Analyzed Logical Plan ==
Aggregate [], [AVG(CAST(cleanedplaytimems#110, LongType)) AS _c0#111,COUNT(1)
AS _c1#112L]
Filter ((key#107 = key#109) && (CAST(lifeAverageBitrateKbps#105, DoubleType) <
(0.5 * avgbitrate#108)))
Join Inner, None
Subquery t1
Aggregate [], [0 AS key#107,AVG(CAST(lifeAverageBitrateKbps#105, LongType))
AS avgbitrate#108]
Filter (lifeAverageBitrateKbps#105 > 0)
Subquery anon_sdm2_ss
Project [CAST(lifeaveragebitratekbps#27, IntegerType) AS
lifeaveragebitratekbps#105,CAST(playtimems#89, IntegerType) AS playtimems#106]
Relation[lifeaveragebitratekbps#27,playtimems#89]
ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)
Subquery t2
Project [0 AS
key#109,lifeAverageBitrateKbps#105,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFIf((playtimems#106
> 0),playtimems#106,0) AS cleanedplaytimems#110]
Filter (lifeAverageBitrateKbps#105 > 0)
Subquery anon_sdm2_ss
Project [CAST(lifeaveragebitratekbps#27, IntegerType) AS
lifeaveragebitratekbps#105,CAST(playtimems#89, IntegerType) AS playtimems#106]
Relation[lifeaveragebitratekbps#27,playtimems#89]
ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)
== Optimized Logical Plan ==
Aggregate [], [AVG(CAST(cleanedplaytimems#110, LongType)) AS _c0#111,COUNT(1)
AS _c1#112L]
Project [cleanedplaytimems#110]
Join Inner, Some(((key#107 = key#109) && (CAST(lifeAverageBitrateKbps#105,
DoubleType) < (0.5 * avgbitrate#108))))
Aggregate [], [0 AS key#107,AVG(CAST(lifeAverageBitrateKbps#105, LongType))
AS avgbitrate#108]
Project [lifeaveragebitratekbps#27 AS lifeaveragebitratekbps#105]
!Filter (lifeAverageBitrateKbps#105 > 0)
Relation[lifeaveragebitratekbps#27,playtimems#89]
ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)
Project [0 AS key#109,lifeaveragebitratekbps#27 AS
lifeaveragebitratekbps#105,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFIf((playtimems#89
AS playtimems#106 > 0),playtimems#89 AS playtimems#106,0) AS
cleanedplaytimems#110]
!Filter (lifeAverageBitrateKbps#105 > 0)
Relation[lifeaveragebitratekbps#27,playtimems#89]
ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)
Note:
Filter is unresolved
> Bug in SQL Optimizer: Unresolved Attribute after pushing Filter below Project
> -----------------------------------------------------------------------------
>
> Key: SPARK-8466
> URL: https://issues.apache.org/jira/browse/SPARK-8466
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.3.1, 1.4.0
> Reporter: Kai Zeng
> Priority: Critical
>
> Input Data: a parquet file stored in hdfs://xxxx/data with two columns
> (lifeAverageBitrateKbps int, playtimems int)
> =================================
> Scripts used in spark-shell:
> {code}
> val df = sqlContext.parquetFile("hdfs://xxxx/data")
> import org.apache.spark.sql.types._
> val cols = df.schema.fields.map { f =>
> val dataType = f.dataType match {
> case DoubleType | FloatType => DecimalType.Unlimited
> case t => t
> }
> df.col(f.name).cast(dataType).as(f.name)
> }
> df.select(cols: _*).registerTempTable("t")
> val query =
> """
> |SELECT avg(cleanedplaytimems),
> | count(1)
> |FROM
> | (SELECT 0 key,
> | avg(lifeAverageBitrateKbps) avgbitrate
> | FROM anon_sdm2_ss
> | WHERE lifeAverageBitrateKbps > 0) t1,
> | (SELECT 0 key,
> | lifeAverageBitrateKbps,
> | if(playtimems > 0, playtimems, 0) cleanedplaytimems
> | FROM anon_sdm2_ss
> | WHERE lifeAverageBitrateKbps > 0) t2
> |WHERE t1.key=t2.key
> | AND t2.lifeAverageBitrateKbps < 0.5 * t1.avgbitrate
> """.stripMargin
> sqlContext.sql(query).explain(true)
> {code}
> ===========================
> Output:
> {code}
> == Analyzed Logical Plan ==
> Aggregate [], [AVG(CAST(cleanedplaytimems#110, LongType)) AS _c0#111,COUNT(1)
> AS _c1#112L]
> Filter ((key#107 = key#109) && (CAST(lifeAverageBitrateKbps#105, DoubleType)
> < (0.5 * avgbitrate#108)))
> Join Inner, None
> Subquery t1
> Aggregate [], [0 AS key#107,AVG(CAST(lifeAverageBitrateKbps#105,
> LongType)) AS avgbitrate#108]
> Filter (lifeAverageBitrateKbps#105 > 0)
> Subquery anon_sdm2_ss
> Project [CAST(lifeaveragebitratekbps#27, IntegerType) AS
> lifeaveragebitratekbps#105,CAST(playtimems#89, IntegerType) AS playtimems#106]
> Relation[lifeaveragebitratekbps#27,playtimems#89]
> ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)
> Subquery t2
> Project [0 AS
> key#109,lifeAverageBitrateKbps#105,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFIf((playtimems#106
> > 0),playtimems#106,0) AS cleanedplaytimems#110]
> Filter (lifeAverageBitrateKbps#105 > 0)
> Subquery anon_sdm2_ss
> Project [CAST(lifeaveragebitratekbps#27, IntegerType) AS
> lifeaveragebitratekbps#105,CAST(playtimems#89, IntegerType) AS playtimems#106]
> Relation[lifeaveragebitratekbps#27,playtimems#89]
> ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)
> == Optimized Logical Plan ==
> Aggregate [], [AVG(CAST(cleanedplaytimems#110, LongType)) AS _c0#111,COUNT(1)
> AS _c1#112L]
> Project [cleanedplaytimems#110]
> Join Inner, Some(((key#107 = key#109) && (CAST(lifeAverageBitrateKbps#105,
> DoubleType) < (0.5 * avgbitrate#108))))
> Aggregate [], [0 AS key#107,AVG(CAST(lifeAverageBitrateKbps#105,
> LongType)) AS avgbitrate#108]
> Project [lifeaveragebitratekbps#27 AS lifeaveragebitratekbps#105]
> !Filter (lifeAverageBitrateKbps#105 > 0)
> Relation[lifeaveragebitratekbps#27,playtimems#89]
> ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)
> Project [0 AS key#109,lifeaveragebitratekbps#27 AS
> lifeaveragebitratekbps#105,HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFIf((playtimems#89
> AS playtimems#106 > 0),playtimems#89 AS playtimems#106,0) AS
> cleanedplaytimems#110]
> !Filter (lifeAverageBitrateKbps#105 > 0)
> Relation[lifeaveragebitratekbps#27,playtimems#89]
> ParquetRelation2(WrappedArray(hdfs://xxxx/data),Map(),None,None)
> {code}
> Note:
> Filter is unresolved
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]