[
https://issues.apache.org/jira/browse/SPARK-21999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Michael N updated SPARK-21999:
------------------------------
Description:
Hi,
I am using Spark Streaming v2.1.0 with Kafka 0.8. I am getting
ConcurrentModificationException intermittently. When it occurs, Spark does not
honor the specified value of spark.task.maxFailures. So Spark aborts the
current batch and fetch the next batch, so it results in lost data. Its
exception stack is listed below.
This instance of ConcurrentModificationException is similar to the issue at
https://issues.apache.org/jira/browse/SPARK-17463, which was about
Serialization of accumulators in heartbeats. However, my Spark stream app does
not use accumulators.
There are two stack traces below. The first one is from the Spark streaming
driver at the time of data loss. The second one is from the Spark slave
server that runs the application several hours later. It may or may not be
related. They are listed here because they involve the same type of
Because this issue results in lost data as described above, could you have this
issue fixed ASAP ?
Thanks.
Michael N.,
----------------
Stack trace of Spark Streaming driver
ERROR JobScheduler:91: Error generating jobs for time 1505224930000 ms
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
at
org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
at
org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
at scala.Option.map(Option.scala:146)
at
org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:289)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
at
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at
org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at
org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:289)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:192)
at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList.writeObject(ArrayList.java:766)
at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 60 more
2017-09-12 07:02:10.029 ERROR
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
at
org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
at
org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
at scala.Option.map(Option.scala:146)
at
org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:289)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
at
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at
org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at
org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:289)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:192)
at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList.writeObject(ArrayList.java:766)
at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 60 more
===
The stack trace on the Spark slave node occurred multiple hours later and is
listed below. It is listed here for completeness in case it is another
variation of the same issue.
Stack trace on the slave node
NettyRpcEndpointRef: Error sending message [message =
Heartbeat(3,[Lscala.Tuple2;@54fd4cc8,BlockManagerId(3, <server name here>,
43576, None))] in 1 attempts
org.apache.spark.SparkException: Exception thrown in awaitResult
at
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
at
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
at
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:538)
at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:567)
at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:567)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList.writeObject(ArrayList.java:766)
at sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at
java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at
org.apache.spark.rpc.netty.NettyRpcEnv.serialize(NettyRpcEnv.scala:253)
at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:227)
at
org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:507)
at
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
... 13 more
----
was:
Hi,
I am using Spark Streaming v2.1.0 with Kafka 0.8. I am getting
ConcurrentModificationException intermittently. When it occurs, Spark does not
honor the specified value of spark.task.maxFailures. So Spark aborts the
current batch and fetch the next batch, so it results in lost data. Its
exception stack is listed below.
This instance of ConcurrentModificationException is similar to the issue at
https://issues.apache.org/jira/browse/SPARK-17463, which was about
Serialization of accumulators in heartbeats. However, my Spark stream app does
not use accumulators.
There are two stack traces below. The first one is from the Spark slave server
that runs the application. The second one is from the Spark streaming driver.
The first stack trace references "Error sending message [message = Heartbeat".
From the lines of code in the trace, which object in the Heartbeat was Spark
trying to serialize ?
Because this issue results in lost data as described above, could you have this
issue fixed ASAP ?
Thanks.
Michael N.,
----------------
Stack trace on the slave node
NettyRpcEndpointRef: Error sending message [message =
Heartbeat(3,[Lscala.Tuple2;@54fd4cc8,BlockManagerId(3, <server name here>,
43576, None))] in 1 attempts
org.apache.spark.SparkException: Exception thrown in awaitResult
at
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
at
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
at
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:538)
at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:567)
at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:567)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList.writeObject(ArrayList.java:766)
at sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at
java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at
org.apache.spark.rpc.netty.NettyRpcEnv.serialize(NettyRpcEnv.scala:253)
at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:227)
at
org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:507)
at
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
... 13 more
----
Stack trace of Spark Streaming driver
ERROR JobScheduler:91: Error generating jobs for time 1505224930000 ms
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
at
org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
at
org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
at scala.Option.map(Option.scala:146)
at
org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:289)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
at
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at
org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at
org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:289)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:192)
at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList.writeObject(ArrayList.java:766)
at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 60 more
2017-09-12 07:02:10.029 ERROR
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
at
org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
at
org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
at scala.Option.map(Option.scala:146)
at
org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:289)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
at
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at
org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at
org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:289)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:192)
at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at
org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList.writeObject(ArrayList.java:766)
at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 60 more
> ConcurrentModificationException - Error sending message [message = Heartbeat
> ----------------------------------------------------------------------------
>
> Key: SPARK-21999
> URL: https://issues.apache.org/jira/browse/SPARK-21999
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.1.0
> Reporter: Michael N
> Priority: Critical
>
> Hi,
> I am using Spark Streaming v2.1.0 with Kafka 0.8. I am getting
> ConcurrentModificationException intermittently. When it occurs, Spark does
> not honor the specified value of spark.task.maxFailures. So Spark aborts the
> current batch and fetch the next batch, so it results in lost data. Its
> exception stack is listed below.
> This instance of ConcurrentModificationException is similar to the issue at
> https://issues.apache.org/jira/browse/SPARK-17463, which was about
> Serialization of accumulators in heartbeats. However, my Spark stream app
> does not use accumulators.
> There are two stack traces below. The first one is from the Spark streaming
> driver at the time of data loss. The second one is from the Spark slave
> server that runs the application several hours later. It may or may not be
> related. They are listed here because they involve the same type of
> Because this issue results in lost data as described above, could you have
> this issue fixed ASAP ?
> Thanks.
> Michael N.,
> ----------------
> Stack trace of Spark Streaming driver
> ERROR JobScheduler:91: Error generating jobs for time 1505224930000 ms
> org.apache.spark.SparkException: Task not serializable
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
> at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
> at
> org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
> at
> org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
> at scala.Option.map(Option.scala:146)
> at
> org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> at scala.Option.orElse(Option.scala:289)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> at
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
> at
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> at
> org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> at scala.Option.orElse(Option.scala:289)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
> at scala.util.Try$.apply(Try.scala:192)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayList.writeObject(ArrayList.java:766)
> at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
> ... 60 more
> 2017-09-12 07:02:10.029 ERROR
> org.apache.spark.SparkException: Task not serializable
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
> at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
> at
> org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
> at
> org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
> at scala.Option.map(Option.scala:146)
> at
> org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> at scala.Option.orElse(Option.scala:289)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> at
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
> at
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> at
> org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> at scala.Option.orElse(Option.scala:289)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
> at scala.util.Try$.apply(Try.scala:192)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayList.writeObject(ArrayList.java:766)
> at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
> ... 60 more
> ===
> The stack trace on the Spark slave node occurred multiple hours later and is
> listed below. It is listed here for completeness in case it is another
> variation of the same issue.
> Stack trace on the slave node
> NettyRpcEndpointRef: Error sending message [message =
> Heartbeat(3,[Lscala.Tuple2;@54fd4cc8,BlockManagerId(3, <server name here>,
> 43576, None))] in 1 attempts
> org.apache.spark.SparkException: Exception thrown in awaitResult
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
> at
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
> at
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:538)
> at
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:567)
> at
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
> at
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
> at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:567)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayList.writeObject(ArrayList.java:766)
> at sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
> at
> java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
> at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
> at
> org.apache.spark.rpc.netty.NettyRpcEnv.serialize(NettyRpcEnv.scala:253)
> at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:227)
> at
> org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:507)
> at
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
> ... 13 more
> ----
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]