[ 
https://issues.apache.org/jira/browse/SPARK-21443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16097521#comment-16097521
 ] 

Eyal Zituny commented on SPARK-21443:
-------------------------------------

Disabling this options indeed, fixed my problem. 
i would have suggest to add a log message in cases when such rules takes more 
then a second (or less), saying that there is an option to disable this rule, 
since otherwise it isn't easy to understand what is going on and how it can fix 
fixed.
btw. when i looked for a spark conf that might help me with this issue, i was 
going over all the conf starting with "spark.sql.optimizer" and obviously 
haven't noticed this one

> Very long planning duration for queries with lots of operations
> ---------------------------------------------------------------
>
>                 Key: SPARK-21443
>                 URL: https://issues.apache.org/jira/browse/SPARK-21443
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL, Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Eyal Zituny
>
> Creating a streaming query with large amount of operations and fields (100+) 
> results in a very long query planning phase. in the example bellow, the plan 
> phase has taken 35 seconds while the actual batch execution took only 1.3 
> second.
> after some investigation, i have found out that the root causes of this are 2 
> optimizer rules which seems to take most of the planning time: 
> InferFiltersFromConstraints and PruneFilters
> I would suggest the following:
> # fix the inefficient optimizer rules
> # add warn level logging if a rule has taken more then xx ms
> # allow custom removing of optimizer rules (opposite to 
> spark.experimental.extraOptimizations)
> # reuse query plans (optional) where possible
> reproducing this issue can be done with the bellow script which simulates the 
> scenario:
> {code:java}
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.execution.streaming.MemoryStream
> import 
> org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, 
> QueryStartedEvent, QueryTerminatedEvent}
> import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQueryListener}
> case class Product(pid: Long, name: String, price: Long, ts: Long = 
> System.currentTimeMillis())
> case class Events (eventId: Long, eventName: String, productId: Long) {
>       def this(id: Long) = this(id, s"event$id", id%100)
> }
> object SparkTestFlow {
>       def main(args: Array[String]): Unit = {
>               val spark = SparkSession
>                 .builder
>                 .appName("TestFlow")
>                 .master("local[8]")
>                 .getOrCreate()
>               spark.sqlContext.streams.addListener(new StreamingQueryListener 
> {
>                       override def onQueryTerminated(event: 
> QueryTerminatedEvent): Unit = {}
>                       override def onQueryProgress(event: 
> QueryProgressEvent): Unit = {
>                               if (event.progress.numInputRows>0) {
>                                       println(event.progress.toString())
>                               }
>                       }
>                       override def onQueryStarted(event: QueryStartedEvent): 
> Unit = {}
>               })
>               
>               import spark.implicits._
>               implicit val  sclContext = spark.sqlContext
>               import org.apache.spark.sql.functions.expr
>               val seq = (1L to 100L).map(i => Product(i, s"name$i", 10L*i))
>               val lookupTable = spark.createDataFrame(seq)
>               val inputData = MemoryStream[Events]
>               inputData.addData((1L to 100L).map(i => new Events(i)))
>               val events = inputData.toDF()
>                 .withColumn("w1", expr("0"))
>                 .withColumn("x1", expr("0"))
>                 .withColumn("y1", expr("0"))
>                 .withColumn("z1", expr("0"))
>               val numberOfSelects = 40 // set to 100+ and the planning takes 
> forever
>               val dfWithSelectsExpr = (2 to 
> numberOfSelects).foldLeft(events)((df,i) =>{
>                       val arr = df.columns.++(Array(s"w${i-1} + rand() as 
> w$i", s"x${i-1} + rand() as x$i", s"y${i-1} + 2 as y$i", s"z${i-1} +1 as 
> z$i"))
>                       df.selectExpr(arr:_*)
>               })
>               val withJoinAndFilter = dfWithSelectsExpr
>                 .join(lookupTable, expr("productId = pid"))
>                 .filter("productId < 50")
>               val query = withJoinAndFilter.writeStream
>                 .outputMode("append")
>                 .format("console")
>                 .trigger(ProcessingTime(2000))
>                 .start()
>               query.processAllAvailable()
>               spark.stop()
>       }
> }
> {code}
> the query progress output will show: 
> {code:java}
> "durationMs" : {
>     "addBatch" : 1310,
>     "getBatch" : 6,
>     "getOffset" : 0,
>     "*queryPlanning*" : 36924,
>     "triggerExecution" : 38297,
>     "walCommit" : 33
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to