[ 
https://issues.apache.org/jira/browse/SPARK-23187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16338737#comment-16338737
 ] 

Saisai Shao commented on SPARK-23187:
-------------------------------------

Actually heartbeat report is OK according to my investigation, registered 
accumulator will report back current updates from executors to driver, no need 
to wait task end.

The only thing is that in Spark UI, accumulator will only be displayed when 
task is finished, but for internal metric accumulators they will display in 
live. So I guess it is because UI doesn't display your registered accumulator 
in time, which makes you think that accumulator is not reported in heartbeat.

> Accumulator object can not be sent from Executor to Driver
> ----------------------------------------------------------
>
>                 Key: SPARK-23187
>                 URL: https://issues.apache.org/jira/browse/SPARK-23187
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.1, 2.3.0
>            Reporter: Lantao Jin
>            Priority: Major
>
> In the Executor.scala->reportHeartBeat(), task Metrics value can not be sent 
> to Driver (In receive side all values are zero).
> I write an UT for explanation.
> {code}
> diff --git 
> a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala 
> b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
> index f9481f8..57fb096 100644
> --- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
> +++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
> @@ -17,11 +17,16 @@
>  package org.apache.spark.rpc.netty
> +import scala.collection.mutable.ArrayBuffer
> +
>  import org.scalatest.mockito.MockitoSugar
>  import org.apache.spark._
>  import org.apache.spark.network.client.TransportClient
>  import org.apache.spark.rpc._
> +import org.apache.spark.util.AccumulatorContext
> +import org.apache.spark.util.AccumulatorV2
> +import org.apache.spark.util.LongAccumulator
>  class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
> @@ -83,5 +88,21 @@ class NettyRpcEnvSuite extends RpcEnvSuite with 
> MockitoSugar {
>      assertRequestMessageEquals(
>        msg3,
>        RequestMessage(nettyEnv, client, msg3.serialize(nettyEnv)))
> +
> +    val acc = new LongAccumulator
> +    val sc = SparkContext.getOrCreate(new 
> SparkConf().setMaster("local").setAppName("testAcc"));
> +    sc.register(acc, "testAcc")
> +    acc.setValue(11111)
> +//    val msg4 = new RequestMessage(senderAddress, receiver, acc)
> +//    assertRequestMessageEquals(
> +//      msg4,
> +//      RequestMessage(nettyEnv, client, msg4.serialize(nettyEnv)))
> +
> +    val accbuf = new ArrayBuffer[AccumulatorV2[_, _]]()
> +    accbuf += acc
> +    val msg5 = new RequestMessage(senderAddress, receiver, accbuf)
> +    assertRequestMessageEquals(
> +      msg5,
> +      RequestMessage(nettyEnv, client, msg5.serialize(nettyEnv)))
>    }
>  }
> {code}
> msg4 and msg5 are all going to failed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to