Repository: spark
Updated Branches:
  refs/heads/master ed0c57e10 -> 0e318acd0


[SPARK-25901][CORE] Use only one thread in BarrierTaskContext companion object

## What changes were proposed in this pull request?

Now we use only one `timer` (and thus a backing thread) in `BarrierTaskContext` 
companion object, and the objects can add `timerTasks` to that `timer`.

## How was this patch tested?

This was tested manually by generating logs and seeing that they look the same 
as ones before, namely, that is, a partition waiting on another partition for 
5seconds generates 4-5 log messages when the frequency of logging is set to 
1second.

Closes #22912 from yogeshg/thread.

Authored-by: Yogesh Garg <[email protected]>
Signed-off-by: Xingbo Jiang <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e318acd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e318acd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e318acd

Branch: refs/heads/master
Commit: 0e318acd0cc3b42e8be9cb2a53cccfdc4a0805f9
Parents: ed0c57e
Author: Yogesh Garg <[email protected]>
Authored: Sat Nov 3 14:03:50 2018 +0800
Committer: Xingbo Jiang <[email protected]>
Committed: Sat Nov 3 14:03:50 2018 +0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/BarrierTaskContext.scala | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0e318acd/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala 
b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
index 90a5c41..7ce421e 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
@@ -41,14 +41,14 @@ import org.apache.spark.util._
 class BarrierTaskContext private[spark] (
     taskContext: TaskContext) extends TaskContext with Logging {
 
+  import BarrierTaskContext._
+
   // Find the driver side RPCEndpointRef of the coordinator that handles all 
the barrier() calls.
   private val barrierCoordinator: RpcEndpointRef = {
     val env = SparkEnv.get
     RpcUtils.makeDriverRef("barrierSync", env.conf, env.rpcEnv)
   }
 
-  private val timer = new Timer("Barrier task timer for barrier() calls.")
-
   // Local barrierEpoch that identify a barrier() call from current task, it 
shall be identical
   // with the driver side epoch.
   private var barrierEpoch = 0
@@ -234,4 +234,7 @@ object BarrierTaskContext {
   @Experimental
   @Since("2.4.0")
   def get(): BarrierTaskContext = 
TaskContext.get().asInstanceOf[BarrierTaskContext]
+
+  private val timer = new Timer("Barrier task timer for barrier() calls.")
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to