[ 
https://issues.apache.org/jira/browse/SPARK-25597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hannu Kröger updated SPARK-25597:
---------------------------------
    Description: 
When _SELECT * FROM table LIMIT 1_ is executed, the WholeStageCodegenExec 
generates code that iterates the source iterator completely instead of just 
first element (per partition). This is a major performance penalty over the 
expected functionality. This does not happen if _spark.sql.codegen.wholeStage_ 
is set to false in spark context conf.

In codegen mode {{LocalLimitExec}} overrides {{stopEarly}} method of 
{{BufferedRowIterator}} to stop consuming rows when enough rows are fetched. 
However, {{stopEarly}} is only called by code generated by {{InputAdapter}} and 
{{InputAdapter}} is not added to the plan for some reason.

One potential way to fix this would be to check {{stopEarly}} in {{hasNext}} 
method of {{BufferedRowIterator}}. Interesting fact is that the {{next}} does 
not call {{hasNext}}. Therefore {{next}} might return empty result even if 
there is data to be fetched by the {{processNext}} method. Or add a check to 
stopEarly in shouldStop method.

This commit might have broken the original functionality: 
[https://github.com/apache/spark/commit/b41ec997786e2be42a8a2a182212a610d08b221b]

You can easily test with this:
{code:java}
// Scala test:
test("iteration with limit") {
     sqlContext.read.format(
        classOf[infinite.DefaultSource].getPackage.getName)
        .schema("value INTEGER")
        .load()
        .createOrReplaceTempView("numbers")

    sqlContext.sql("SELECT * FROM numbers LIMIT 1").collect().length shouldBe 1
}

class DefaultSource extends SchemaRelationProvider {
    override def createRelation(sqlContext: SQLContext, parameters: Map[String, 
String], schema: StructType): BaseRelation = {
        new InfiniteRelation(sqlContext, schema)
    }
}

class InfiniteRDD(sc: SparkContext) extends RDD[Row](sc, Seq.empty) {
    override def compute(split: Partition, context: TaskContext): Iterator[Row] 
= {
        new PrintingIterator(Stream.from(0).map(i => Row(i)).iterator)
    }

    override protected def getPartitions: Array[Partition] = {
        Array(new Partition {
            override def index: Int = 0
        })
    }
}

class InfiniteRelation(override val sqlContext: SQLContext, val schema: 
StructType) extends BaseRelation with TableScan {
    override def buildScan(): RDD[Row] = new 
InfiniteRDD(sqlContext.sparkContext)
}

class PrintingIterator[T](backingIterator: Iterator[T]) extends 
AbstractIterator[T] {
    override def hasNext: Boolean = {
        backingIterator.hasNext
    }

    override def next(): T = {
        val nextValue = backingIterator.next()
        println(s"Printing Iterator is being called. Value: ${nextValue}")
        nextValue
    }
}{code}

  was:
When _SELECT * FROM table LIMIT 1_ is executed, the WholeStageCodegenExec 
generates code that iterates the source iterator completely instead of just 
first element (per partition). This is a major performance penalty over the 
expected functionality. This does not happen if _spark.sql.codegen.wholeStage_ 
is set to false in spark context conf.

In codegen mode {{LocalLimitExec}} overrides {{stopEarly}} method of 
{{BufferedRowIterator}} to stop consuming rows when enough rows are fetched. 
However, {{stopEarly}} is only called by code generated by {{InputAdapter}} and 
{{InputAdapter}} is not added to the plan for some reason.

One potential way to fix this would be to check {{stopEarly}} in {{hasNext}} 
method of {{BufferedRowIterator}}. Interesting fact is that the {{next}} does 
not call {{hasNext}}. Therefore {{next}} might return empty result even if 
there is data to be fetched by the {{processNext}} method. Or add a check to 
stopEarly in shouldStop method.

This commit might have broken the original functionality: 
[https://github.com/apache/spark/commit/b41ec997786e2be42a8a2a182212a610d08b221b]


> SQL query with limit iterates the whole iterator when WholeStage code 
> generation is enabled
> -------------------------------------------------------------------------------------------
>
>                 Key: SPARK-25597
>                 URL: https://issues.apache.org/jira/browse/SPARK-25597
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0
>            Reporter: Hannu Kröger
>            Priority: Major
>
> When _SELECT * FROM table LIMIT 1_ is executed, the WholeStageCodegenExec 
> generates code that iterates the source iterator completely instead of just 
> first element (per partition). This is a major performance penalty over the 
> expected functionality. This does not happen if 
> _spark.sql.codegen.wholeStage_ is set to false in spark context conf.
> In codegen mode {{LocalLimitExec}} overrides {{stopEarly}} method of 
> {{BufferedRowIterator}} to stop consuming rows when enough rows are fetched. 
> However, {{stopEarly}} is only called by code generated by {{InputAdapter}} 
> and {{InputAdapter}} is not added to the plan for some reason.
> One potential way to fix this would be to check {{stopEarly}} in {{hasNext}} 
> method of {{BufferedRowIterator}}. Interesting fact is that the {{next}} does 
> not call {{hasNext}}. Therefore {{next}} might return empty result even if 
> there is data to be fetched by the {{processNext}} method. Or add a check to 
> stopEarly in shouldStop method.
> This commit might have broken the original functionality: 
> [https://github.com/apache/spark/commit/b41ec997786e2be42a8a2a182212a610d08b221b]
> You can easily test with this:
> {code:java}
> // Scala test:
> test("iteration with limit") {
>      sqlContext.read.format(
>         classOf[infinite.DefaultSource].getPackage.getName)
>         .schema("value INTEGER")
>         .load()
>         .createOrReplaceTempView("numbers")
>     sqlContext.sql("SELECT * FROM numbers LIMIT 1").collect().length shouldBe 
> 1
> }
> class DefaultSource extends SchemaRelationProvider {
>     override def createRelation(sqlContext: SQLContext, parameters: 
> Map[String, String], schema: StructType): BaseRelation = {
>         new InfiniteRelation(sqlContext, schema)
>     }
> }
> class InfiniteRDD(sc: SparkContext) extends RDD[Row](sc, Seq.empty) {
>     override def compute(split: Partition, context: TaskContext): 
> Iterator[Row] = {
>         new PrintingIterator(Stream.from(0).map(i => Row(i)).iterator)
>     }
>     override protected def getPartitions: Array[Partition] = {
>         Array(new Partition {
>             override def index: Int = 0
>         })
>     }
> }
> class InfiniteRelation(override val sqlContext: SQLContext, val schema: 
> StructType) extends BaseRelation with TableScan {
>     override def buildScan(): RDD[Row] = new 
> InfiniteRDD(sqlContext.sparkContext)
> }
> class PrintingIterator[T](backingIterator: Iterator[T]) extends 
> AbstractIterator[T] {
>     override def hasNext: Boolean = {
>         backingIterator.hasNext
>     }
>     override def next(): T = {
>         val nextValue = backingIterator.next()
>         println(s"Printing Iterator is being called. Value: ${nextValue}")
>         nextValue
>     }
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to