Repository: spark
Updated Branches:
  refs/heads/master f4a3d45e3 -> c399c7f0e


[SPARK-16002][SQL] Sleep when no new data arrives to avoid 100% CPU usage

## What changes were proposed in this pull request?

Add a configuration to allow people to set a minimum polling delay when no new 
data arrives (default is 10ms). This PR also cleans up some INFO logs.

## How was this patch tested?

Existing unit tests.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #13718 from zsxwing/SPARK-16002.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c399c7f0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c399c7f0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c399c7f0

Branch: refs/heads/master
Commit: c399c7f0e485dcfc6cbc343bc246b8adc3f0648c
Parents: f4a3d45
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Tue Jun 21 12:42:49 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Jun 21 12:42:49 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/ManualClock.scala | 18 +++++++++++++++---
 .../datasources/ListingFileCatalog.scala          |  2 +-
 .../datasources/fileSourceInterfaces.scala        |  2 +-
 .../execution/streaming/FileStreamSource.scala    |  8 +++++++-
 .../sql/execution/streaming/StreamExecution.scala |  5 +++++
 .../org/apache/spark/sql/internal/SQLConf.scala   |  9 ++++++++-
 .../apache/spark/sql/streaming/StreamTest.scala   |  5 +++++
 7 files changed, 42 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c399c7f0/core/src/main/scala/org/apache/spark/util/ManualClock.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ManualClock.scala 
b/core/src/main/scala/org/apache/spark/util/ManualClock.scala
index e7a65d7..91a9587 100644
--- a/core/src/main/scala/org/apache/spark/util/ManualClock.scala
+++ b/core/src/main/scala/org/apache/spark/util/ManualClock.scala
@@ -26,6 +26,8 @@ package org.apache.spark.util
  */
 private[spark] class ManualClock(private var time: Long) extends Clock {
 
+  private var _isWaiting = false
+
   /**
    * @return `ManualClock` with initial time 0
    */
@@ -57,9 +59,19 @@ private[spark] class ManualClock(private var time: Long) 
extends Clock {
    * @return current time reported by the clock when waiting finishes
    */
   def waitTillTime(targetTime: Long): Long = synchronized {
-    while (time < targetTime) {
-      wait(10)
+    _isWaiting = true
+    try {
+      while (time < targetTime) {
+        wait(10)
+      }
+      getTimeMillis()
+    } finally {
+      _isWaiting = false
     }
-    getTimeMillis()
   }
+
+  /**
+   * Returns whether there is any thread being blocked in `waitTillTime`.
+   */
+  def isWaiting: Boolean = synchronized { _isWaiting }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c399c7f0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index d96cf1b..f713fde 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -82,7 +82,7 @@ class ListingFileCatalog(
       val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
       val statuses: Seq[FileStatus] = paths.flatMap { path =>
         val fs = path.getFileSystem(hadoopConf)
-        logInfo(s"Listing $path on driver")
+        logTrace(s"Listing $path on driver")
         Try {
           HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), 
pathFilter)
         }.getOrElse(Array.empty[FileStatus])

http://git-wip-us.apache.org/repos/asf/spark/blob/c399c7f0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 4ac555b..521eb7f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -389,7 +389,7 @@ private[sql] object HadoopFsRelation extends Logging {
   // tasks/jobs may leave partial/corrupted data files there.  Files and 
directories whose name
   // start with "." are also ignored.
   def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): 
Array[FileStatus] = {
-    logInfo(s"Listing ${status.getPath}")
+    logTrace(s"Listing ${status.getPath}")
     val name = status.getPath.getName.toLowerCase
     if (shouldFilterOut(name)) {
       Array.empty

http://git-wip-us.apache.org/repos/asf/spark/blob/c399c7f0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 9886ad0..11bf3c0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -120,7 +120,13 @@ class FileStreamSource(
     val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, 
Some(new StructType))
     val files = catalog.allFiles().map(_.getPath.toUri.toString)
     val endTime = System.nanoTime
-    logInfo(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 
1000000}ms")
+    val listingTimeMs = (endTime.toDouble - startTime) / 1000000
+    if (listingTimeMs > 2000) {
+      // Output a warning when listing files uses more than 2 seconds.
+      logWarning(s"Listed ${files.size} file(s) in $listingTimeMs ms")
+    } else {
+      logTrace(s"Listed ${files.size} file(s) in $listingTimeMs ms")
+    }
     logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
     files
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c399c7f0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index bb42a11..1428b97 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeMap}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming._
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
 
@@ -56,6 +57,8 @@ class StreamExecution(
 
   import org.apache.spark.sql.streaming.StreamingQueryListener._
 
+  private val pollingDelayMs = 
sparkSession.conf.get(SQLConf.STREAMING_POLLING_DELAY)
+
   /**
    * A lock used to wait/notify when batches complete. Use a fair lock to 
avoid thread starvation.
    */
@@ -190,6 +193,8 @@ class StreamExecution(
             runBatch()
             // We'll increase currentBatchId after we complete processing 
current batch's data
             currentBatchId += 1
+          } else {
+            Thread.sleep(pollingDelayMs)
           }
           true
         } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/c399c7f0/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 4b8916f..1a9bb6a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -534,7 +534,7 @@ object SQLConf {
   val FILE_SINK_LOG_CLEANUP_DELAY =
     SQLConfigBuilder("spark.sql.streaming.fileSink.log.cleanupDelay")
       .internal()
-      .doc("How long in milliseconds a file is guaranteed to be visible for 
all readers.")
+      .doc("How long that a file is guaranteed to be visible for all readers.")
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefault(60 * 1000L) // 10 minutes
 
@@ -545,6 +545,13 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val STREAMING_POLLING_DELAY =
+    SQLConfigBuilder("spark.sql.streaming.pollingDelay")
+      .internal()
+      .doc("How long to delay polling new data when no data is available")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefault(10L)
+
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c399c7f0/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 720ffaf..f949652 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -326,6 +326,11 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
                    "can not advance manual clock when a stream is not running")
             verify(currentStream.triggerClock.isInstanceOf[ManualClock],
                    s"can not advance clock of type 
${currentStream.triggerClock.getClass}")
+            val clock = currentStream.triggerClock.asInstanceOf[ManualClock]
+            // Make sure we don't advance ManualClock too early. See 
SPARK-16002.
+            eventually("ManualClock has not yet entered the waiting state") {
+              assert(clock.isWaiting)
+            }
             
currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd)
 
           case StopStream =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to