[
https://issues.apache.org/jira/browse/SPARK-21443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16090242#comment-16090242
]
Kazuaki Ishizaki commented on SPARK-21443:
------------------------------------------
These two optimizations {{InferFiltersFromConstraints}} and {{PruneFiltersare}}
known as time-consuming optimizations.
Since It is not easy to fix to fix the root cause, Spark community introduced
an option {{spark.sql.constraintPropagation.enabled}} to disable these
optimization by [this PR|https://github.com/apache/spark/pull/17186].
Is it possible to alleviate the problem by using this option?
> Very long planning duration for queries with lots of operations
> ---------------------------------------------------------------
>
> Key: SPARK-21443
> URL: https://issues.apache.org/jira/browse/SPARK-21443
> Project: Spark
> Issue Type: Bug
> Components: SQL, Structured Streaming
> Affects Versions: 2.2.0
> Reporter: Eyal Zituny
>
> Creating a streaming query with large amount of operations and fields (100+)
> results in a very long query planning phase. in the example bellow, the plan
> phase has taken 35 seconds while the actual batch execution took only 1.3
> second.
> after some investigation, i have found out that the root causes of this are 2
> optimizer rules which seems to take most of the planning time:
> InferFiltersFromConstraints and PruneFilters
> I would suggest the following:
> # fix the inefficient optimizer rules
> # add warn level logging if a rule has taken more then xx ms
> # allow custom removing of optimizer rules (opposite to
> spark.experimental.extraOptimizations)
> # reuse query plans (optional) where possible
> reproducing this issue can be done with the bellow script which simulates the
> scenario:
> {code:java}
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.execution.streaming.MemoryStream
> import
> org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent,
> QueryStartedEvent, QueryTerminatedEvent}
> import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQueryListener}
> case class Product(pid: Long, name: String, price: Long, ts: Long =
> System.currentTimeMillis())
> case class Events (eventId: Long, eventName: String, productId: Long) {
> def this(id: Long) = this(id, s"event$id", id%100)
> }
> object SparkTestFlow {
> def main(args: Array[String]): Unit = {
> val spark = SparkSession
> .builder
> .appName("TestFlow")
> .master("local[8]")
> .getOrCreate()
> spark.sqlContext.streams.addListener(new StreamingQueryListener
> {
> override def onQueryTerminated(event:
> QueryTerminatedEvent): Unit = {}
> override def onQueryProgress(event:
> QueryProgressEvent): Unit = {
> if (event.progress.numInputRows>0) {
> println(event.progress.toString())
> }
> }
> override def onQueryStarted(event: QueryStartedEvent):
> Unit = {}
> })
>
> import spark.implicits._
> implicit val sclContext = spark.sqlContext
> import org.apache.spark.sql.functions.expr
> val seq = (1L to 100L).map(i => Product(i, s"name$i", 10L*i))
> val lookupTable = spark.createDataFrame(seq)
> val inputData = MemoryStream[Events]
> inputData.addData((1L to 100L).map(i => new Events(i)))
> val events = inputData.toDF()
> .withColumn("w1", expr("0"))
> .withColumn("x1", expr("0"))
> .withColumn("y1", expr("0"))
> .withColumn("z1", expr("0"))
> val numberOfSelects = 40 // set to 100+ and the planning takes
> forever
> val dfWithSelectsExpr = (2 to
> numberOfSelects).foldLeft(events)((df,i) =>{
> val arr = df.columns.++(Array(s"w${i-1} + rand() as
> w$i", s"x${i-1} + rand() as x$i", s"y${i-1} + 2 as y$i", s"z${i-1} +1 as
> z$i"))
> df.selectExpr(arr:_*)
> })
> val withJoinAndFilter = dfWithSelectsExpr
> .join(lookupTable, expr("productId = pid"))
> .filter("productId < 50")
> val query = withJoinAndFilter.writeStream
> .outputMode("append")
> .format("console")
> .trigger(ProcessingTime(2000))
> .start()
> query.processAllAvailable()
> spark.stop()
> }
> }
> {code}
> the query progress output will show:
> {code:java}
> "durationMs" : {
> "addBatch" : 1310,
> "getBatch" : 6,
> "getOffset" : 0,
> "*queryPlanning*" : 36924,
> "triggerExecution" : 38297,
> "walCommit" : 33
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]