Repository: spark Updated Branches: refs/heads/master ed0c57e10 -> 0e318acd0
[SPARK-25901][CORE] Use only one thread in BarrierTaskContext companion object ## What changes were proposed in this pull request? Now we use only one `timer` (and thus a backing thread) in `BarrierTaskContext` companion object, and the objects can add `timerTasks` to that `timer`. ## How was this patch tested? This was tested manually by generating logs and seeing that they look the same as ones before, namely, that is, a partition waiting on another partition for 5seconds generates 4-5 log messages when the frequency of logging is set to 1second. Closes #22912 from yogeshg/thread. Authored-by: Yogesh Garg <[email protected]> Signed-off-by: Xingbo Jiang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e318acd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e318acd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e318acd Branch: refs/heads/master Commit: 0e318acd0cc3b42e8be9cb2a53cccfdc4a0805f9 Parents: ed0c57e Author: Yogesh Garg <[email protected]> Authored: Sat Nov 3 14:03:50 2018 +0800 Committer: Xingbo Jiang <[email protected]> Committed: Sat Nov 3 14:03:50 2018 +0800 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/BarrierTaskContext.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0e318acd/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala index 90a5c41..7ce421e 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala @@ -41,14 +41,14 @@ import org.apache.spark.util._ class BarrierTaskContext private[spark] ( taskContext: TaskContext) extends TaskContext with Logging { + import BarrierTaskContext._ + // Find the driver side RPCEndpointRef of the coordinator that handles all the barrier() calls. private val barrierCoordinator: RpcEndpointRef = { val env = SparkEnv.get RpcUtils.makeDriverRef("barrierSync", env.conf, env.rpcEnv) } - private val timer = new Timer("Barrier task timer for barrier() calls.") - // Local barrierEpoch that identify a barrier() call from current task, it shall be identical // with the driver side epoch. private var barrierEpoch = 0 @@ -234,4 +234,7 @@ object BarrierTaskContext { @Experimental @Since("2.4.0") def get(): BarrierTaskContext = TaskContext.get().asInstanceOf[BarrierTaskContext] + + private val timer = new Timer("Barrier task timer for barrier() calls.") + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
