[ 
https://issues.apache.org/jira/browse/SPARK-34934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean R. Owen reassigned SPARK-34934:
------------------------------------

    Assignee: Harsh Panchal

> Race condition while registering source in MetricsSystem 
> ---------------------------------------------------------
>
>                 Key: SPARK-34934
>                 URL: https://issues.apache.org/jira/browse/SPARK-34934
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.2, 3.1.1
>            Reporter: Harsh Panchal
>            Assignee: Harsh Panchal
>            Priority: Minor
>
> {{MetricsSystem}} manages {{mutable.ArrayBuffer}} of metric sources. 
> {{registerSource}} and {{removeSource}} methods are provided to add/remove 
> new source from Metric system. Both these methods are not synchronised. Also, 
> underlying {{mutable.ArrayBuffer}} not being thread safe, unexpected 
> behaviours are possible if called concurrently.
> Some background:
> We have created one custom RpcEndPoint which receives messages from executors 
> and create new metrics by registering custom sources. These messages are 
> processed concurrently on driver side causing this issue. 
> Also, this will go unnoticed as {{Inbox}} will ignore these exceptions.
> We found this issue in Spark 2.3.2, but it should be present in all later 
> versions.
> For ex, we got below exception due to race condition
> {noformat}
> java.lang.ArrayIndexOutOfBoundsException
>       at 
> scala.collection.mutable.ResizableArray$class.ensureSize(ResizableArray.scala:104)
>       at scala.collection.mutable.ArrayBuffer.ensureSize(ArrayBuffer.scala:48)
>       at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:84)
>       at 
> org.apache.spark.metrics.MetricsSystem.registerSource(MetricsSystem.scala:157)
>       * some closure *
>       at 
> org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
>       at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
>       at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
>       at 
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745){noformat}
> While shutting down
> {noformat}
> Exception in thread "stream execution thread for [id = 
> 9a36a08a-f1be-4ad8-b1dd-093f0b53d37d, runId = 
> d668a19c-aced-45c4-963c-c0b93411d1a4]" 
> java.lang.ArrayIndexOutOfBoundsException: 32
>       at 
> scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:44)
>       at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:48)
>       at 
> scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:195)
>       at 
> scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:48)
>       at 
> scala.collection.IndexedSeqOptimized$class.indexWhere(IndexedSeqOptimized.scala:204)
>       at scala.collection.mutable.ArrayBuffer.indexWhere(ArrayBuffer.scala:48)
>       at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:145)
>       at scala.collection.AbstractSeq.indexOf(Seq.scala:41)
>       at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:129)
>       at scala.collection.AbstractSeq.indexOf(Seq.scala:41)
>       at 
> scala.collection.mutable.BufferLike$class.$minus$eq(BufferLike.scala:127)
>       at scala.collection.mutable.AbstractBuffer.$minus$eq(Buffer.scala:49)
>       at 
> org.apache.spark.metrics.MetricsSystem.removeSource(MetricsSystem.scala:167)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runStream$2.apply(StreamExecution.scala:324)
>       at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:308)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189){noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to