This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 5065a50 [SPARK-30804][SS] Measure and log elapsed time for "compact" operation in CompactibleFileStreamLog 5065a50 is described below commit 5065a50e16076b8c875d1d24bed5742f828adb1c Author: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com> AuthorDate: Fri Apr 24 12:34:44 2020 +0900 [SPARK-30804][SS] Measure and log elapsed time for "compact" operation in CompactibleFileStreamLog ### What changes were proposed in this pull request? This patch adds some log messages to log elapsed time for "compact" operation in FileStreamSourceLog and FileStreamSinkLog (added in CompactibleFileStreamLog) to help investigating the mysterious latency spike during the batch run. ### Why are the changes needed? Tracking latency is a critical aspect of streaming query. While "compact" operation may bring nontrivial latency (it's even synchronous, adding all the latency to the batch run), it's not measured and end users have to guess. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? N/A for UT. Manual test with streaming query using file source & file sink. > grep "for compact batch" <driver log> ``` ... 20/02/20 19:27:36 WARN FileStreamSinkLog: Compacting took 24473 ms (load: 14185 ms, write: 10288 ms) for compact batch 21359 20/02/20 19:27:39 WARN FileStreamSinkLog: Loaded 1068000 entries (397985432 bytes in memory), and wrote 1068000 entries for compact batch 21359 20/02/20 19:29:52 WARN FileStreamSourceLog: Compacting took 3777 ms (load: 1524 ms, write: 2253 ms) for compact batch 21369 20/02/20 19:29:52 WARN FileStreamSourceLog: Loaded 229477 entries (68970112 bytes in memory), and wrote 229477 entries for compact batch 21369 20/02/20 19:30:17 WARN FileStreamSinkLog: Compacting took 24183 ms (load: 12992 ms, write: 11191 ms) for compact batch 21369 20/02/20 19:30:20 WARN FileStreamSinkLog: Loaded 1068500 entries (398171880 bytes in memory), and wrote 1068500 entries for compact batch 21369 ... ``` ![Screen Shot 2020-02-21 at 12 34 22 PM](https://user-images.githubusercontent.com/1317309/75002142-c6830100-54a6-11ea-8da6-17afb056653b.png) This messages are explaining why the operation duration peaks per every 10 batches which is compact interval. Latency from addBatch heavily increases in each peak which DOES NOT mean it takes more time to write outputs, but we have no idea if such message is not presented. NOTE: The output may be a bit different from the code, as it may be changed a bit during review phase. Closes #27557 from HeartSaVioR/SPARK-30804. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> (cherry picked from commit 39bc50dbf8ca835389f63b47baca129e088c5a19) Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- .../streaming/CompactibleFileStreamLog.scala | 39 +++++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 905bce4..10bcfe6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -28,6 +28,7 @@ import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.sql.SparkSession +import org.apache.spark.util.{SizeEstimator, Utils} /** * An abstract class for compactible metadata logs. It will write one log file for each batch. @@ -177,16 +178,35 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( * corresponding `batchId` file. It will delete expired files as well if enabled. */ private def compact(batchId: Long, logs: Array[T]): Boolean = { - val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval) - val allLogs = validBatches.flatMap { id => - super.get(id).getOrElse { - throw new IllegalStateException( - s"${batchIdToPath(id)} doesn't exist when compacting batch $batchId " + - s"(compactInterval: $compactInterval)") - } - } ++ logs + val (allLogs, loadElapsedMs) = Utils.timeTakenMs { + val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval) + validBatches.flatMap { id => + super.get(id).getOrElse { + throw new IllegalStateException( + s"${batchIdToPath(id)} doesn't exist when compacting batch $batchId " + + s"(compactInterval: $compactInterval)") + } + } ++ logs + } + val compactedLogs = compactLogs(allLogs) + // Return false as there is another writer. - super.add(batchId, compactLogs(allLogs).toArray) + val (writeSucceed, writeElapsedMs) = Utils.timeTakenMs { + super.add(batchId, compactedLogs.toArray) + } + + val elapsedMs = loadElapsedMs + writeElapsedMs + if (elapsedMs >= COMPACT_LATENCY_WARN_THRESHOLD_MS) { + logWarning(s"Compacting took $elapsedMs ms (load: $loadElapsedMs ms," + + s" write: $writeElapsedMs ms) for compact batch $batchId") + logWarning(s"Loaded ${allLogs.size} entries (estimated ${SizeEstimator.estimate(allLogs)} " + + s"bytes in memory), and wrote ${compactedLogs.size} entries for compact batch $batchId") + } else { + logDebug(s"Compacting took $elapsedMs ms (load: $loadElapsedMs ms," + + s" write: $writeElapsedMs ms) for compact batch $batchId") + } + + writeSucceed } /** @@ -268,6 +288,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( object CompactibleFileStreamLog { val COMPACT_FILE_SUFFIX = ".compact" + val COMPACT_LATENCY_WARN_THRESHOLD_MS = 2000 def getBatchIdFromFileName(fileName: String): Long = { fileName.stripSuffix(COMPACT_FILE_SUFFIX).toLong --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org