[
https://issues.apache.org/jira/browse/SPARK-25597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hannu Kröger updated SPARK-25597:
---------------------------------
Description:
When _SELECT * FROM table LIMIT 1_ is executed, the WholeStageCodegenExec
generates code that iterates the source iterator completely instead of just
first element (per partition). This is a major performance penalty over the
expected functionality. This does not happen if _spark.sql.codegen.wholeStage_
is set to false in spark context conf.
In codegen mode {{LocalLimitExec}} overrides {{stopEarly}} method of
{{BufferedRowIterator}} to stop consuming rows when enough rows are fetched.
However, {{stopEarly}} is only called by code generated by {{InputAdapter}} and
{{InputAdapter}} is not added to the plan for some reason.
One potential way to fix this would be to check {{stopEarly}} in {{hasNext}}
method of {{BufferedRowIterator}}. Interesting fact is that the {{next}} does
not call {{hasNext}}. Therefore {{next}} might return empty result even if
there is data to be fetched by the {{processNext}} method. Or add a check to
stopEarly in shouldStop method.
This commit might have broken the original functionality:
[https://github.com/apache/spark/commit/b41ec997786e2be42a8a2a182212a610d08b221b]
You can easily test with this:
{code:java}
// Scala test:
test("iteration with limit") {
sqlContext.read.format(
classOf[infinite.DefaultSource].getPackage.getName)
.schema("value INTEGER")
.load()
.createOrReplaceTempView("numbers")
sqlContext.sql("SELECT * FROM numbers LIMIT 1").collect().length shouldBe 1
}
class DefaultSource extends SchemaRelationProvider {
override def createRelation(sqlContext: SQLContext, parameters: Map[String,
String], schema: StructType): BaseRelation = {
new InfiniteRelation(sqlContext, schema)
}
}
class InfiniteRDD(sc: SparkContext) extends RDD[Row](sc, Seq.empty) {
override def compute(split: Partition, context: TaskContext): Iterator[Row]
= {
new PrintingIterator(Stream.from(0).map(i => Row(i)).iterator)
}
override protected def getPartitions: Array[Partition] = {
Array(new Partition {
override def index: Int = 0
})
}
}
class InfiniteRelation(override val sqlContext: SQLContext, val schema:
StructType) extends BaseRelation with TableScan {
override def buildScan(): RDD[Row] = new
InfiniteRDD(sqlContext.sparkContext)
}
class PrintingIterator[T](backingIterator: Iterator[T]) extends
AbstractIterator[T] {
override def hasNext: Boolean = {
backingIterator.hasNext
}
override def next(): T = {
val nextValue = backingIterator.next()
println(s"Printing Iterator is being called. Value: ${nextValue}")
nextValue
}
}{code}
was:
When _SELECT * FROM table LIMIT 1_ is executed, the WholeStageCodegenExec
generates code that iterates the source iterator completely instead of just
first element (per partition). This is a major performance penalty over the
expected functionality. This does not happen if _spark.sql.codegen.wholeStage_
is set to false in spark context conf.
In codegen mode {{LocalLimitExec}} overrides {{stopEarly}} method of
{{BufferedRowIterator}} to stop consuming rows when enough rows are fetched.
However, {{stopEarly}} is only called by code generated by {{InputAdapter}} and
{{InputAdapter}} is not added to the plan for some reason.
One potential way to fix this would be to check {{stopEarly}} in {{hasNext}}
method of {{BufferedRowIterator}}. Interesting fact is that the {{next}} does
not call {{hasNext}}. Therefore {{next}} might return empty result even if
there is data to be fetched by the {{processNext}} method. Or add a check to
stopEarly in shouldStop method.
This commit might have broken the original functionality:
[https://github.com/apache/spark/commit/b41ec997786e2be42a8a2a182212a610d08b221b]
> SQL query with limit iterates the whole iterator when WholeStage code
> generation is enabled
> -------------------------------------------------------------------------------------------
>
> Key: SPARK-25597
> URL: https://issues.apache.org/jira/browse/SPARK-25597
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.3.0
> Reporter: Hannu Kröger
> Priority: Major
>
> When _SELECT * FROM table LIMIT 1_ is executed, the WholeStageCodegenExec
> generates code that iterates the source iterator completely instead of just
> first element (per partition). This is a major performance penalty over the
> expected functionality. This does not happen if
> _spark.sql.codegen.wholeStage_ is set to false in spark context conf.
> In codegen mode {{LocalLimitExec}} overrides {{stopEarly}} method of
> {{BufferedRowIterator}} to stop consuming rows when enough rows are fetched.
> However, {{stopEarly}} is only called by code generated by {{InputAdapter}}
> and {{InputAdapter}} is not added to the plan for some reason.
> One potential way to fix this would be to check {{stopEarly}} in {{hasNext}}
> method of {{BufferedRowIterator}}. Interesting fact is that the {{next}} does
> not call {{hasNext}}. Therefore {{next}} might return empty result even if
> there is data to be fetched by the {{processNext}} method. Or add a check to
> stopEarly in shouldStop method.
> This commit might have broken the original functionality:
> [https://github.com/apache/spark/commit/b41ec997786e2be42a8a2a182212a610d08b221b]
> You can easily test with this:
> {code:java}
> // Scala test:
> test("iteration with limit") {
> sqlContext.read.format(
> classOf[infinite.DefaultSource].getPackage.getName)
> .schema("value INTEGER")
> .load()
> .createOrReplaceTempView("numbers")
> sqlContext.sql("SELECT * FROM numbers LIMIT 1").collect().length shouldBe
> 1
> }
> class DefaultSource extends SchemaRelationProvider {
> override def createRelation(sqlContext: SQLContext, parameters:
> Map[String, String], schema: StructType): BaseRelation = {
> new InfiniteRelation(sqlContext, schema)
> }
> }
> class InfiniteRDD(sc: SparkContext) extends RDD[Row](sc, Seq.empty) {
> override def compute(split: Partition, context: TaskContext):
> Iterator[Row] = {
> new PrintingIterator(Stream.from(0).map(i => Row(i)).iterator)
> }
> override protected def getPartitions: Array[Partition] = {
> Array(new Partition {
> override def index: Int = 0
> })
> }
> }
> class InfiniteRelation(override val sqlContext: SQLContext, val schema:
> StructType) extends BaseRelation with TableScan {
> override def buildScan(): RDD[Row] = new
> InfiniteRDD(sqlContext.sparkContext)
> }
> class PrintingIterator[T](backingIterator: Iterator[T]) extends
> AbstractIterator[T] {
> override def hasNext: Boolean = {
> backingIterator.hasNext
> }
> override def next(): T = {
> val nextValue = backingIterator.next()
> println(s"Printing Iterator is being called. Value: ${nextValue}")
> nextValue
> }
> }{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]