Sergey Zhemzhitsky created SPARK-24154: ------------------------------------------
Summary: AccumulatorV2 loses type information during serialization Key: SPARK-24154 URL: https://issues.apache.org/jira/browse/SPARK-24154 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.3.0, 2.2.1, 2.2.0, 2.3.1 Environment: Scala 2.11 Spark 2.2.0 Reporter: Sergey Zhemzhitsky AccumulatorV2 loses type information during serialization. It happens [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L164] during *writeReplace* call {code:scala} final protected def writeReplace(): Any = { if (atDriverSide) { if (!isRegistered) { throw new UnsupportedOperationException( "Accumulator must be registered before send to executor") } val copyAcc = copyAndReset() assert(copyAcc.isZero, "copyAndReset must return a zero value copy") val isInternalAcc = name.isDefined && name.get.startsWith(InternalAccumulator.METRICS_PREFIX) if (isInternalAcc) { // Do not serialize the name of internal accumulator and send it to executor. copyAcc.metadata = metadata.copy(name = None) } else { // For non-internal accumulators, we still need to send the name because users may need to // access the accumulator name at executor side, or they may keep the accumulators sent from // executors and access the name when the registered accumulator is already garbage // collected(e.g. SQLMetrics). copyAcc.metadata = metadata } copyAcc } else { this } } {code} It means that it is hardly possible to create new accumulators easily by adding new behaviour to existing ones by means of mix-ins or inheritance (without overriding *copy*). For example the following snippet ... {code:scala} trait TripleCount { self: LongAccumulator => abstract override def add(v: jl.Long): Unit = { self.add(v * 3) } } val acc = new LongAccumulator with TripleCount sc.register(acc) val data = 1 to 10 val rdd = sc.makeRDD(data, 5) rdd.foreach(acc.add(_)) acc.value shouldBe 3 * data.sum {code} ... fails with {code:none} org.scalatest.exceptions.TestFailedException: 55 was not equal to 165 at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:340) at org.scalatest.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6864) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org