guoyuepeng commented on a change in pull request #504: build and run transform 
steps with multiple threads
URL: https://github.com/apache/griffin/pull/504#discussion_r299360212
 
 

 ##########
 File path: 
measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala
 ##########
 @@ -0,0 +1,229 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.utils
+
+import java.util.concurrent._
+
+import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor}
+import scala.concurrent.duration.Duration
+import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, 
ForkJoinWorkerThread => SForkJoinWorkerThread}
+import scala.util.control.NonFatal
+
+import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
+
+private[griffin] object ThreadUtils {
+
+  private val sameThreadExecutionContext =
+    ExecutionContext.fromExecutorService(MoreExecutors.sameThreadExecutor())
+
+  /**
+   * An `ExecutionContextExecutor` that runs each task in the thread that 
invokes `execute/submit`.
+   * The caller should make sure the tasks running in this 
`ExecutionContextExecutor` are short and
+   * never block.
+   */
+  def sameThread: ExecutionContextExecutor = sameThreadExecutionContext
+
+  /**
+   * Create a thread factory that names threads with a prefix and also sets 
the threads to daemon.
+   */
+  def namedThreadFactory(prefix: String): ThreadFactory = {
+    new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix + 
"-%d").build()
+  }
+
+  /**
+   * Wrapper over newCachedThreadPool. Thread names are formatted as 
prefix-ID, where ID is a
+   * unique, sequentially assigned integer.
+   */
+  def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
+    val threadFactory = namedThreadFactory(prefix)
+    
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
+  }
+
+  /**
+   * Create a cached thread pool whose max number of threads is 
`maxThreadNumber`. Thread names
+   * are formatted as prefix-ID, where ID is a unique, sequentially assigned 
integer.
+   */
+  def newDaemonCachedThreadPool(
+      prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): 
ThreadPoolExecutor = {
+    val threadFactory = namedThreadFactory(prefix)
+    val threadPool = new ThreadPoolExecutor(
+      maxThreadNumber, // corePoolSize: the max number of threads to create 
before queuing the tasks
+      maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, 
this one is not used
+      keepAliveSeconds,
+      TimeUnit.SECONDS,
+      new LinkedBlockingQueue[Runnable],
+      threadFactory)
+    threadPool.allowCoreThreadTimeOut(true)
+    threadPool
+  }
+
+  /**
+   * Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, 
where ID is a
+   * unique, sequentially assigned integer.
+   */
+  def newDaemonFixedThreadPool(nThreads: Int, prefix: String): 
ThreadPoolExecutor = {
+    val threadFactory = namedThreadFactory(prefix)
+    Executors.newFixedThreadPool(nThreads, 
threadFactory).asInstanceOf[ThreadPoolExecutor]
+  }
+
+  /**
+   * Wrapper over newSingleThreadExecutor.
+   */
+  def newDaemonSingleThreadExecutor(threadName: String): ExecutorService = {
+    val threadFactory = new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
+    Executors.newSingleThreadExecutor(threadFactory)
+  }
+
+  /**
+   * Wrapper over ScheduledThreadPoolExecutor.
+   */
+  def newDaemonSingleThreadScheduledExecutor(threadName: String): 
ScheduledExecutorService = {
+    val threadFactory = new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
+    val executor = new ScheduledThreadPoolExecutor(1, threadFactory)
+    // By default, a cancelled task is not automatically removed from the work 
queue until its delay
+    // elapses. We have to enable it manually.
+    executor.setRemoveOnCancelPolicy(true)
+    executor
+  }
+
+  /**
+   * Run a piece of code in a new thread and return the result. Exception in 
the new thread is
+   * thrown in the caller thread with an adjusted stack trace that removes 
references to this
+   * method for clarity. The exception stack traces will be like the following
+   *
+   * SomeException: exception-message
+   *   at CallerClass.body-method (sourcefile.scala)
+   *   at ... run in separate thread using org.apache.spark.util.ThreadUtils 
... ()
+   *   at CallerClass.caller-method (sourcefile.scala)
+   *   ...
+   */
+  def runInNewThread[T](
+      threadName: String,
+      isDaemon: Boolean = true)(body: => T): T = {
+    @volatile var exception: Option[Throwable] = None
+    @volatile var result: T = null.asInstanceOf[T]
+
+    val thread = new Thread(threadName) {
+      override def run(): Unit = {
+        try {
+          result = body
+        } catch {
+          case NonFatal(e) =>
+            exception = Some(e)
+        }
+      }
+    }
+    thread.setDaemon(isDaemon)
+    thread.start()
+    thread.join()
+
+    exception match {
+      case Some(realException) =>
+        // Remove the part of the stack that shows method calls into this 
helper method
+        // This means drop everything from the top until the stack element
+        // ThreadUtils.runInNewThread(), and then drop that as well (hence the 
`drop(1)`).
+        val baseStackTrace = Thread.currentThread().getStackTrace().dropWhile(
+          ! _.getClassName.contains(this.getClass.getSimpleName)).drop(1)
+
+        // Remove the part of the new thread stack that shows methods call 
from this helper method
+        val extraStackTrace = realException.getStackTrace.takeWhile(
+          ! _.getClassName.contains(this.getClass.getSimpleName))
+
+        // Combine the two stack traces, with a place holder just specifying 
that there
+        // was a helper method used, without any further details of the helper
+        val placeHolderStackElem = new StackTraceElement(
+          s"... run in separate thread using 
${ThreadUtils.getClass.getName.stripSuffix("$")} ..",
+          " ", "", -1)
+        val finalStackTrace = extraStackTrace ++ Seq(placeHolderStackElem) ++ 
baseStackTrace
+
+        // Update the stack trace and rethrow the exception in the caller 
thread
+        realException.setStackTrace(finalStackTrace)
+        throw realException
+      case None =>
+        result
+    }
+  }
+
+  /**
+   * Construct a new Scala ForkJoinPool with a specified max parallelism and 
name prefix.
+   */
+  def newForkJoinPool(prefix: String, maxThreadNumber: Int): SForkJoinPool = {
+    // Custom factory to set thread names
+    val factory = new SForkJoinPool.ForkJoinWorkerThreadFactory {
+      override def newThread(pool: SForkJoinPool) =
+        new SForkJoinWorkerThread(pool) {
+          setName(prefix + "-" + super.getName)
+        }
+    }
+    new SForkJoinPool(maxThreadNumber, factory,
+      null, // handler
+      false // asyncMode
+    )
+  }
+
+  // scalastyle:off awaitresult
+  /**
+   * Preferred alternative to `Await.result()`.
+   *
+   * This method wraps and re-throws any exceptions thrown by the underlying 
`Await` call, ensuring
+   * that this thread's stack trace appears in logs.
+   *
+   * In addition, it calls `Awaitable.result` directly to avoid using 
`ForkJoinPool`'s
+   * `BlockingContext`. Codes running in the user's thread may be in a thread 
of Scala ForkJoinPool.
+   * As concurrent executions in ForkJoinPool may see some [[ThreadLocal]] 
value unexpectedly, this
+   * method basically prevents ForkJoinPool from running other tasks in the 
current waiting thread.
+   * In general, we should use this method because many places in Spark use 
[[ThreadLocal]] and it's
+   * hard to debug when [[ThreadLocal]]s leak to other tasks.
+   */
+  @throws(classOf[Exception])
+  def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
+    try {
+      // `awaitPermission` is not actually used anywhere so it's safe to pass 
in null here.
+      // See SPARK-13747.
 
 Review comment:
   ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to