Repository: spark Updated Branches: refs/heads/master f4a3d45e3 -> c399c7f0e
[SPARK-16002][SQL] Sleep when no new data arrives to avoid 100% CPU usage ## What changes were proposed in this pull request? Add a configuration to allow people to set a minimum polling delay when no new data arrives (default is 10ms). This PR also cleans up some INFO logs. ## How was this patch tested? Existing unit tests. Author: Shixiong Zhu <shixi...@databricks.com> Closes #13718 from zsxwing/SPARK-16002. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c399c7f0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c399c7f0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c399c7f0 Branch: refs/heads/master Commit: c399c7f0e485dcfc6cbc343bc246b8adc3f0648c Parents: f4a3d45 Author: Shixiong Zhu <shixi...@databricks.com> Authored: Tue Jun 21 12:42:49 2016 -0700 Committer: Yin Huai <yh...@databricks.com> Committed: Tue Jun 21 12:42:49 2016 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/util/ManualClock.scala | 18 +++++++++++++++--- .../datasources/ListingFileCatalog.scala | 2 +- .../datasources/fileSourceInterfaces.scala | 2 +- .../execution/streaming/FileStreamSource.scala | 8 +++++++- .../sql/execution/streaming/StreamExecution.scala | 5 +++++ .../org/apache/spark/sql/internal/SQLConf.scala | 9 ++++++++- .../apache/spark/sql/streaming/StreamTest.scala | 5 +++++ 7 files changed, 42 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c399c7f0/core/src/main/scala/org/apache/spark/util/ManualClock.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/ManualClock.scala b/core/src/main/scala/org/apache/spark/util/ManualClock.scala index e7a65d7..91a9587 100644 --- a/core/src/main/scala/org/apache/spark/util/ManualClock.scala +++ b/core/src/main/scala/org/apache/spark/util/ManualClock.scala @@ -26,6 +26,8 @@ package org.apache.spark.util */ private[spark] class ManualClock(private var time: Long) extends Clock { + private var _isWaiting = false + /** * @return `ManualClock` with initial time 0 */ @@ -57,9 +59,19 @@ private[spark] class ManualClock(private var time: Long) extends Clock { * @return current time reported by the clock when waiting finishes */ def waitTillTime(targetTime: Long): Long = synchronized { - while (time < targetTime) { - wait(10) + _isWaiting = true + try { + while (time < targetTime) { + wait(10) + } + getTimeMillis() + } finally { + _isWaiting = false } - getTimeMillis() } + + /** + * Returns whether there is any thread being blocked in `waitTillTime`. + */ + def isWaiting: Boolean = synchronized { _isWaiting } } http://git-wip-us.apache.org/repos/asf/spark/blob/c399c7f0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala index d96cf1b..f713fde 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -82,7 +82,7 @@ class ListingFileCatalog( val pathFilter = FileInputFormat.getInputPathFilter(jobConf) val statuses: Seq[FileStatus] = paths.flatMap { path => val fs = path.getFileSystem(hadoopConf) - logInfo(s"Listing $path on driver") + logTrace(s"Listing $path on driver") Try { HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter) }.getOrElse(Array.empty[FileStatus]) http://git-wip-us.apache.org/repos/asf/spark/blob/c399c7f0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index 4ac555b..521eb7f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -389,7 +389,7 @@ private[sql] object HadoopFsRelation extends Logging { // tasks/jobs may leave partial/corrupted data files there. Files and directories whose name // start with "." are also ignored. def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): Array[FileStatus] = { - logInfo(s"Listing ${status.getPath}") + logTrace(s"Listing ${status.getPath}") val name = status.getPath.getName.toLowerCase if (shouldFilterOut(name)) { Array.empty http://git-wip-us.apache.org/repos/asf/spark/blob/c399c7f0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 9886ad0..11bf3c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -120,7 +120,13 @@ class FileStreamSource( val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType)) val files = catalog.allFiles().map(_.getPath.toUri.toString) val endTime = System.nanoTime - logInfo(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms") + val listingTimeMs = (endTime.toDouble - startTime) / 1000000 + if (listingTimeMs > 2000) { + // Output a warning when listing files uses more than 2 seconds. + logWarning(s"Listed ${files.size} file(s) in $listingTimeMs ms") + } else { + logTrace(s"Listed ${files.size} file(s) in $listingTimeMs ms") + } logTrace(s"Files are:\n\t" + files.mkString("\n\t")) files } http://git-wip-us.apache.org/repos/asf/spark/blob/c399c7f0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index bb42a11..1428b97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} @@ -56,6 +57,8 @@ class StreamExecution( import org.apache.spark.sql.streaming.StreamingQueryListener._ + private val pollingDelayMs = sparkSession.conf.get(SQLConf.STREAMING_POLLING_DELAY) + /** * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation. */ @@ -190,6 +193,8 @@ class StreamExecution( runBatch() // We'll increase currentBatchId after we complete processing current batch's data currentBatchId += 1 + } else { + Thread.sleep(pollingDelayMs) } true } else { http://git-wip-us.apache.org/repos/asf/spark/blob/c399c7f0/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4b8916f..1a9bb6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -534,7 +534,7 @@ object SQLConf { val FILE_SINK_LOG_CLEANUP_DELAY = SQLConfigBuilder("spark.sql.streaming.fileSink.log.cleanupDelay") .internal() - .doc("How long in milliseconds a file is guaranteed to be visible for all readers.") + .doc("How long that a file is guaranteed to be visible for all readers.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(60 * 1000L) // 10 minutes @@ -545,6 +545,13 @@ object SQLConf { .booleanConf .createWithDefault(false) + val STREAMING_POLLING_DELAY = + SQLConfigBuilder("spark.sql.streaming.pollingDelay") + .internal() + .doc("How long to delay polling new data when no data is available") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(10L) + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } http://git-wip-us.apache.org/repos/asf/spark/blob/c399c7f0/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 720ffaf..f949652 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -326,6 +326,11 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { "can not advance manual clock when a stream is not running") verify(currentStream.triggerClock.isInstanceOf[ManualClock], s"can not advance clock of type ${currentStream.triggerClock.getClass}") + val clock = currentStream.triggerClock.asInstanceOf[ManualClock] + // Make sure we don't advance ManualClock too early. See SPARK-16002. + eventually("ManualClock has not yet entered the waiting state") { + assert(clock.isWaiting) + } currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd) case StopStream => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org