[ 
https://issues.apache.org/jira/browse/SPARK-23697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-23697:
---------------------------------------
    Description: 
I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x 
failing with
 java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
value copy

It happens while serializing an accumulator 
[here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165]
{code:java}
val copyAcc = copyAndReset()
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
... although copyAndReset returns zero-value copy for sure, just consider the 
accumulator below
{code:java}
val concatParam = new AccumulatorParam[jl.StringBuilder] {
  override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new 
jl.StringBuilder()
  override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): 
jl.StringBuilder = r1.append(r2)
}{code}
So, Spark treats zero value as non-zero due to how 
[isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489]
 is implemented in LegacyAccumulatorWrapper.
{code:java}
override def isZero: Boolean = _value == param.zero(initialValue){code}
All this means that the values to be accumulated must implement equals and 
hashCode, otherwise isZero is very likely to always return false.

So I'm wondering whether the assertion 
{code:java}
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
is really necessary and whether it can be safely removed from there?

If not - is it ok to just override writeReplace for LegacyAccumulatorWrapperto 
prevent such failures?

  was:
I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x 
failing with
 java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
value copy

It happens while serializing an accumulator 
[here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165]
{code:java}
val copyAcc = copyAndReset()
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
... although copyAndReset returns zero-value copy for sure, just consider the 
accumulator below
{code:java}
val concatParam = new AccumulatorParam[jl.StringBuilder] {
  override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new 
jl.StringBuilder()
  override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): 
jl.StringBuilder = r1.append(r2)
}{code}
So, Spark treats zero value as non-zero due to how 
[isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489]
 is implemented in LegacyAccumulatorWrapper.
{code:java}
override def isZero: Boolean = _value == param.zero(initialValue){code}
All this means that the values to be accumulated must implement equals and 
hashCode, otherwise isZero is very likely to always return false.

So I'm wondering whether the assertion 
{code:java}
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
is really necessary and whether it can be safely removed from there. 


> Accumulators of Spark 1.x no longer work with Spark 2.x
> -------------------------------------------------------
>
>                 Key: SPARK-23697
>                 URL: https://issues.apache.org/jira/browse/SPARK-23697
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0, 2.2.1
>         Environment: Spark 2.2.0
> Scala 2.11
>            Reporter: Sergey Zhemzhitsky
>            Priority: Major
>
> I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x 
> failing with
>  java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
> value copy
> It happens while serializing an accumulator 
> [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165]
> {code:java}
> val copyAcc = copyAndReset()
> assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
> ... although copyAndReset returns zero-value copy for sure, just consider the 
> accumulator below
> {code:java}
> val concatParam = new AccumulatorParam[jl.StringBuilder] {
>   override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new 
> jl.StringBuilder()
>   override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): 
> jl.StringBuilder = r1.append(r2)
> }{code}
> So, Spark treats zero value as non-zero due to how 
> [isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489]
>  is implemented in LegacyAccumulatorWrapper.
> {code:java}
> override def isZero: Boolean = _value == param.zero(initialValue){code}
> All this means that the values to be accumulated must implement equals and 
> hashCode, otherwise isZero is very likely to always return false.
> So I'm wondering whether the assertion 
> {code:java}
> assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
> is really necessary and whether it can be safely removed from there?
> If not - is it ok to just override writeReplace for 
> LegacyAccumulatorWrapperto prevent such failures?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to