[ https://issues.apache.org/jira/browse/SPARK-24154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16462630#comment-16462630 ]
Sergey Zhemzhitsky commented on SPARK-24154: -------------------------------------------- > If users have to support mixin traits, they can still use accumulator v1. But accumulators V1 are deprecated and will be removed one day I believe. > AccumulatorV2 loses type information during serialization > --------------------------------------------------------- > > Key: SPARK-24154 > URL: https://issues.apache.org/jira/browse/SPARK-24154 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.2.0, 2.2.1, 2.3.0, 2.3.1 > Environment: Scala 2.11 > Spark 2.2.0 > Reporter: Sergey Zhemzhitsky > Priority: Major > > AccumulatorV2 loses type information during serialization. > It happens > [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L164] > during *writeReplace* call > {code:scala} > final protected def writeReplace(): Any = { > if (atDriverSide) { > if (!isRegistered) { > throw new UnsupportedOperationException( > "Accumulator must be registered before send to executor") > } > val copyAcc = copyAndReset() > assert(copyAcc.isZero, "copyAndReset must return a zero value copy") > val isInternalAcc = name.isDefined && > name.get.startsWith(InternalAccumulator.METRICS_PREFIX) > if (isInternalAcc) { > // Do not serialize the name of internal accumulator and send it to > executor. > copyAcc.metadata = metadata.copy(name = None) > } else { > // For non-internal accumulators, we still need to send the name > because users may need to > // access the accumulator name at executor side, or they may keep the > accumulators sent from > // executors and access the name when the registered accumulator is > already garbage > // collected(e.g. SQLMetrics). > copyAcc.metadata = metadata > } > copyAcc > } else { > this > } > } > {code} > It means that it is hardly possible to create new accumulators easily by > adding new behaviour to existing ones by means of mix-ins or inheritance > (without overriding *copy*). > For example the following snippet ... > {code:scala} > trait TripleCount { > self: LongAccumulator => > abstract override def add(v: jl.Long): Unit = { > self.add(v * 3) > } > } > val acc = new LongAccumulator with TripleCount > sc.register(acc) > val data = 1 to 10 > val rdd = sc.makeRDD(data, 5) > rdd.foreach(acc.add(_)) > acc.value shouldBe 3 * data.sum > {code} > ... fails with > {code:none} > org.scalatest.exceptions.TestFailedException: 55 was not equal to 165 > at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:340) > at org.scalatest.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6864) > {code} > Also such a behaviour seems to be error prone and confusing because an > implementor gets not the same thing as he/she sees in the code. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org