[ 
https://issues.apache.org/jira/browse/SPARK-19737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371377#comment-16371377
 ] 

LANDAIS Christophe commented on SPARK-19737:
--------------------------------------------

Hello,

Migrating our application from spark 2.1.1 to spark 2.2.1, we see a major 
degradation in spark-SQL timing. One insert takes 5 seconds in 2.1.1 and 75 
seconds in spark 2.2.1. Looking in executor traces (I force configuration to 
one executor) , we see it takes time between spark.sql(“insert into”) is done 
and task is submitted to executor

My application traces :

2018-02-21 06:30:53 - Executor[1] Going to execute request …

2018-02-21 06:32:08 - Executor[1] request executed (tag: NO_TAG) (table: 
ca4mn.sys_4g_pcmd_mme_15min) (date: 20180221061500) - duration (s)  74.846

 

Executor trace :

18/02/21 06:30:52 INFO Executor: Finished task 0.0 in stage 3.0 (TID 1). 4675 
bytes result sent to driver  (landais note: this is the previous task that is 
terminated)

18/02/21 06:32:06 INFO CoarseGrainedExecutorBackend: Got assigned task 2

 

What is doing spark between 06:30:53 and 06:32:06 ? I have taken several thread 
dump in the container while execution was in progress, with a delay of 2 
seconds between thread dump. They are identical. Thread dump is put at the end 
of this comment.

Thread dump shows time is taken while verifying function exists: it is 
SPARK-19737 modification.

My SQL request contains 1000 functions because we are doing aggregation on many 
columns. Functions are like MAX, MIN, etc …

 

Please, can you perform a modification that improves this check ? For example: 
doing only one check for each different function ? Or why not introducing a 
spark parameter to bypass this check ?

----------------

Thread dump

178 "Executor[1]" #95 prio=5 os_prio=0 tid=0x00007f587f355800 nid=0x7c runnable 
[0x00007f57549f7000]

179    java.lang.Thread.State: RUNNABLE

180         at java.net.SocketInputStream.socketRead0(Native Method)

181         at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)

182         at java.net.SocketInputStream.read(SocketInputStream.java:171)

183         at java.net.SocketInputStream.read(SocketInputStream.java:141)

184         at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)

185         at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)

186         at java.io.BufferedInputStream.read(BufferedInputStream.java:345)

187         - locked <0x000000008913b110> (a java.io.BufferedInputStream)

188         at 
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)

189         at 
org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)

190         at 
org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)

191         at 
org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)

192         at 
org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)

193         at 
org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)

194         at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_database(ThriftHiveMetastore.java:654)

195         at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_database(ThriftHiveMetastore.java:641)

196         at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getDatabase(HiveMetaStoreClient.java:1158)

197         at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)     
                       **

198         at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     

199         at java.lang.reflect.Method.invoke(Method.java:498)

200         at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:156)

201         at com.sun.proxy.$Proxy31.getDatabase(Unknown Source)

202         at 
org.apache.hadoop.hive.ql.metadata.Hive.getDatabase(Hive.java:1301)

203         at 
org.apache.hadoop.hive.ql.metadata.Hive.databaseExists(Hive.java:1290)      **

204         at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$databaseExists$1.apply$mcZ$sp(HiveClientImpl.scala:358)

205         at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$databaseExists$1.apply(HiveClientImpl.scala:358)

206         at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$databaseExists$1.apply(HiveClientImpl.scala:358)

207         at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:290)

208         at 
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:231)

209         at 
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:230)

210         - locked <0x000000008900dd88> (a 
org.apache.spark.sql.hive.client.IsolatedClientLoader)

211         at 
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:273)

212         at 
org.apache.spark.sql.hive.client.HiveClientImpl.databaseExists(HiveClientImpl.scala:357)

213         at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply$mcZ$sp(HiveExternalCatalog.scala:195)

214         at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:195)

215         at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:195)

216         at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)

217         - locked <0x0000000089037ae0> (a 
org.apache.spark.sql.hive.HiveExternalCatalog)

218         at 
org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:194)

219         at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.databaseExists(SessionCatalog.scala:246)

220         at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.org$apache$spark$sql$catalyst$catalog$SessionCatalog$$requireDbExists(SessionCatalog.scala:172)

221         at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.functionExists(SessionCatalog.scala:1044)

222         at 
org.apache.spark.sql.hive.HiveSessionCatalog.functionExists(HiveSessionCatalog.scala:173)

223         at 
org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15.applyOrElse(Analyzer.scala:1196)

224         at 
org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15.applyOrElse(Analyzer.scala:1195)

225         at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)

226         at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)

227         at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)

228         at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)

229         at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)

230         at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)

231         at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)

232         at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)

233         at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)

234         at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)

235         at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:258)

236         at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:258)

237         at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279)

238         at 
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289)

239         at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:293)

240         at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

241         at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

242         at scala.collection.immutable.List.foreach(List.scala:381)

243         at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

244         at scala.collection.immutable.List.map(List.scala:285)

245         at 
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:293)

246         at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298)

247         at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)

248         at 
org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298)

249         at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:258)

250         at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:249)

251         at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformAllExpressions$1.applyOrElse(QueryPlan.scala:309)

252         at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformAllExpressions$1.applyOrElse(QueryPlan.scala:308)

253         at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)

254         at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)

255         at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)

256         at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)

257         at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)

258         at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)

259         at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$11.apply(TreeNode.scala:335)

260         at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

261         at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

262         at scala.collection.immutable.List.foreach(List.scala:381)

263         at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

264         at scala.collection.immutable.List.map(List.scala:285)

265         at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:333)

266         at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)

267         at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)

268         at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)

269         at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)

270         at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)

271         at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)

272         at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)

273         at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)

274         at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)

275         at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)

276         at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)

277         at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)

278         at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)

279         at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)

280         at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)

281         at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)

282         at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)

283         at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)

284         at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)

285         at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)

286         at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)

287         at 
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)

288         at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformAllExpressions(QueryPlan.scala:308)

289         at 
org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$.apply(Analyzer.scala:1195)

290         at 
org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$.apply(Analyzer.scala:1194)

291         at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)

292         at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)

293         at 
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)

294         at 
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)

295         at 
scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)

296         at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)

297         at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)

298         at scala.collection.immutable.List.foreach(List.scala:381)

299         at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)

300         at 
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:69)

301         - locked <0x00000000f9c68cd8> (a 
org.apache.spark.sql.execution.QueryExecution)

302         at 
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:67)

303         at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50)

304         at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67)

305         at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:632)

306         at 
com.nokia.rtna.sumrz.engine.JobExecutorTask.run(JobExecutorTask.java:181)

307         at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

308         at java.util.concurrent.FutureTask.run(FutureTask.java:266)

309         at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

310         at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

311         at java.lang.Thread.run(Thread.java:748)

 

Thanks and BR,

Christophe

> New analysis rule for reporting unregistered functions without relying on 
> relation resolution
> ---------------------------------------------------------------------------------------------
>
>                 Key: SPARK-19737
>                 URL: https://issues.apache.org/jira/browse/SPARK-19737
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.0
>            Reporter: Cheng Lian
>            Assignee: Cheng Lian
>            Priority: Major
>             Fix For: 2.2.0
>
>
> Let's consider the following simple SQL query that reference an undefined 
> function {{foo}} that is never registered in the function registry:
> {code:sql}
> SELECT foo(a) FROM t
> {code}
> Assuming table {{t}} is a partitioned  temporary view consisting of a large 
> number of files stored on S3, it may take the analyzer a long time before 
> realizing that {{foo}} is not registered yet.
> The reason is that the existing analysis rule {{ResolveFunctions}} requires 
> all child expressions to be resolved first. Therefore, {{ResolveRelations}} 
> has to be executed first to resolve all columns referenced by the unresolved 
> function invocation. This further leads to partition discovery for {{t}}, 
> which may take a long time.
> To address this case, we propose a new lightweight analysis rule 
> {{LookupFunctions}} that
> # Matches all unresolved function invocations
> # Look up the function names from the function registry
> # Report analysis error for any unregistered functions
> Since this rule doesn't actually try to resolve the unresolved functions, it 
> doesn't rely on {{ResolveRelations}} and therefore doesn't trigger partition 
> discovery.
> We may put this analysis rule in a separate {{Once}} rule batch that sits 
> between the "Substitution" batch and the "Resolution" batch to avoid running 
> it repeatedly and make sure it gets executed before {{ResolveRelations}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to