Hello all

I've tried to add some task metrics in
org.apache.spark.executor.ShuffleReadMetrics.scala in Spark 2.0.2, following
the format of other existing metrics, but when submitting applications, I
got these errors:

ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry
it.
java.lang.UnsupportedOperationException: Accumulator must be registered
before send to executor
   at
org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:158)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:498)
   at
java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1118)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
   at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
   at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
   at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
   at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
   at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
   at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
   at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
   at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
   at
org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:231)
   at
org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:458)
   ...
16/12/21 16:16:42 ERROR TaskSchedulerImpl: Resource offer failed, task set
TaskSet_0 was not serializable
Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Failed to serialize task 0, not attempting to retry it.
Exception during serialization: java.lang.UnsupportedOperationException:
Accumulator must be registered before send to executor
   at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
   at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
   at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
   at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
   at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
   at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
   at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
   at scala.Option.foreach(Option.scala:257)
   at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
   at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
   at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
   at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
 ...
16/12/21 16:16:42 ERROR LiveListenerBus: SparkListenerBus has already
stopped! Dropping event    
SparkListenerBlockManagerAdded(1482308202401,BlockManagerId(7, 172.18.11.3,
42715),12409896960)
16/12/21 16:16:42 ERROR LiveListenerBus: SparkListenerBus has already
stopped! Dropping event    
SparkListenerBlockManagerAdded(1482308202445,BlockManagerId(5,
172.18.11.121, 41654),12409896960)

It seems like the Accumulator of task metrics has not been registered before
being used, but I also added the new metrics in the nameToAccums map in
TaskMetrics.scala:

private[spark] lazy val nameToAccums = LinkedHashMap(
  ...
  // add by txh
  shuffleRead.SHUFFLE_READ_TIME -> shuffleReadMetrics._shuffleReadTime,
  shuffleRead.SHUFFLE_MERGE_TIME -> shuffleReadMetrics._shuffleMergeTime,
  shuffleRead.SHUFFLE_MERGE_MEMORY ->
shuffleReadMetrics._shuffleMergeMemory,
  shuffleRead.SHUFFLE_USE_MEMORY -> shuffleReadMetrics._shuffleUseMemory,
  ...
)

What else should I add to make these new metrics be registered? Thanks very
much
ShuffleReadMetrics.scala
<http://apache-spark-developers-list.1001551.n3.nabble.com/file/n20330/ShuffleReadMetrics.scala>
  



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Got-an-Accumulator-error-after-adding-some-task-metrics-in-Spark-2-0-2-tp20330.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to