[
https://issues.apache.org/jira/browse/SPARK-17463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15550398#comment-15550398
]
Ergin Seyfe commented on SPARK-17463:
-------------------------------------
Hi [~zsxwing],
I think this bug is not fully fixed. I still get
ConcurrentModificationException.
JsonProtocol.scala:314 has below statement which is causing the problem:
JArray(v.asInstanceOf[java.util.List[(BlockId,
BlockStatus)]].asScala.toList.map {
Adding synchronized keyword should fix the issue. What do you think?
This is the stack trace:
java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
at java.util.ArrayList$Itr.next(ArrayList.java:851)
at
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)
at
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
at scala.collection.TraversableLike$class.to(TraversableLike.scala:590)
at scala.collection.AbstractTraversable.to(Traversable.scala:104)
at
scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:294)
at scala.collection.AbstractTraversable.toList(Traversable.scala:104)
at
org.apache.spark.util.JsonProtocol$.accumValueToJson(JsonProtocol.scala:314)
at
org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291)
at
org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291)
at scala.Option.map(Option.scala:146)
at
org.apache.spark.util.JsonProtocol$.accumulableInfoToJson(JsonProtocol.scala:291)
at
org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283)
at
org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.spark.util.JsonProtocol$.taskInfoToJson(JsonProtocol.scala:283)
at
org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:145)
at
org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:76)
at
org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:137)
at
org.apache.spark.scheduler.EventLoggingListener.onTaskEnd(EventLoggingListener.scala:157)
at
org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:45)
at
org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:35)
at
org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:35)
at
org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at
org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:35)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:81)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:66)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:66)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:65)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1244)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:64)
> Serialization of accumulators in heartbeats is not thread-safe
> --------------------------------------------------------------
>
> Key: SPARK-17463
> URL: https://issues.apache.org/jira/browse/SPARK-17463
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.0.0
> Reporter: Josh Rosen
> Assignee: Shixiong Zhu
> Priority: Critical
> Fix For: 2.0.1, 2.1.0
>
>
> Check out the following {{ConcurrentModificationException}}:
> {code}
> 16/09/06 16:10:29 WARN NettyRpcEndpointRef: Error sending message [message =
> Heartbeat(2,[Lscala.Tuple2;@66e7b6e7,BlockManagerId(2, HOST, 57743))] in 1
> attempts
> org.apache.spark.SparkException: Exception thrown in awaitResult
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
> at
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
> at
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)
> at
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)
> at
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
> at
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1862)
> at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayList.writeObject(ArrayList.java:766)
> at sun.reflect.GeneratedMethodAccessor20.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
> at org.apache.spark.rpc.netty.NettyRpcEnv.serialize(NettyRpcEnv.scala:253)
> at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:227)
> at
> org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:508)
> at
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
> ... 13 more
> {code}
> Even though accumulators aren't thread-safe they can be concurrently read
> while serializing executor heartbeats and modified while tasks are running,
> leading to ConcurrentModificationException errors (thereby leading to missing
> heartbeats) or leading to inconsistent data (since individual fields of a
> multi-field object might be serialized at different points in time, leading
> to inconsistencies in accumulators like LongAccum).
> This seems like a pretty serious issue but I'm not sure what's the best way
> to fix this. An obvious fix would be to properly synchronize all accesses to
> the fields of our accumulators and to synchronize the writeObject and
> writeKryo methods, but this may have an adverse performance impact
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]