[ 
https://issues.apache.org/jira/browse/SPARK-23187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16340521#comment-16340521
 ] 

Saisai Shao commented on SPARK-23187:
-------------------------------------

I'm going to close this JIRA, as there's no issue in reporting accumulators via 
heartbeat. It can be reported periodically according to my verification. Please 
feel free to reopen it if you have further issues.

> Accumulator object can not be sent from Executor to Driver
> ----------------------------------------------------------
>
>                 Key: SPARK-23187
>                 URL: https://issues.apache.org/jira/browse/SPARK-23187
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.1, 2.3.0
>            Reporter: Lantao Jin
>            Priority: Major
>
> In the Executor.scala->reportHeartBeat(), task Metrics value can not be sent 
> to Driver (In receive side all values are zero).
> I write an UT for explanation.
> {code}
> diff --git 
> a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala 
> b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
> index f9481f8..57fb096 100644
> --- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
> +++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
> @@ -17,11 +17,16 @@
>  package org.apache.spark.rpc.netty
> +import scala.collection.mutable.ArrayBuffer
> +
>  import org.scalatest.mockito.MockitoSugar
>  import org.apache.spark._
>  import org.apache.spark.network.client.TransportClient
>  import org.apache.spark.rpc._
> +import org.apache.spark.util.AccumulatorContext
> +import org.apache.spark.util.AccumulatorV2
> +import org.apache.spark.util.LongAccumulator
>  class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
> @@ -83,5 +88,21 @@ class NettyRpcEnvSuite extends RpcEnvSuite with 
> MockitoSugar {
>      assertRequestMessageEquals(
>        msg3,
>        RequestMessage(nettyEnv, client, msg3.serialize(nettyEnv)))
> +
> +    val acc = new LongAccumulator
> +    val sc = SparkContext.getOrCreate(new 
> SparkConf().setMaster("local").setAppName("testAcc"));
> +    sc.register(acc, "testAcc")
> +    acc.setValue(11111)
> +//    val msg4 = new RequestMessage(senderAddress, receiver, acc)
> +//    assertRequestMessageEquals(
> +//      msg4,
> +//      RequestMessage(nettyEnv, client, msg4.serialize(nettyEnv)))
> +
> +    val accbuf = new ArrayBuffer[AccumulatorV2[_, _]]()
> +    accbuf += acc
> +    val msg5 = new RequestMessage(senderAddress, receiver, accbuf)
> +    assertRequestMessageEquals(
> +      msg5,
> +      RequestMessage(nettyEnv, client, msg5.serialize(nettyEnv)))
>    }
>  }
> {code}
> msg4 and msg5 are all going to failed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to