This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5065a50  [SPARK-30804][SS] Measure and log elapsed time for "compact" 
operation in CompactibleFileStreamLog
5065a50 is described below

commit 5065a50e16076b8c875d1d24bed5742f828adb1c
Author: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
AuthorDate: Fri Apr 24 12:34:44 2020 +0900

    [SPARK-30804][SS] Measure and log elapsed time for "compact" operation in 
CompactibleFileStreamLog
    
    ### What changes were proposed in this pull request?
    
    This patch adds some log messages to log elapsed time for "compact" 
operation in FileStreamSourceLog and FileStreamSinkLog (added in 
CompactibleFileStreamLog) to help investigating the mysterious latency spike 
during the batch run.
    
    ### Why are the changes needed?
    
    Tracking latency is a critical aspect of streaming query. While "compact" 
operation may bring nontrivial latency (it's even synchronous, adding all the 
latency to the batch run), it's not measured and end users have to guess.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    N/A for UT. Manual test with streaming query using file source & file sink.
    
    > grep "for compact batch" <driver log>
    
    ```
    ...
    20/02/20 19:27:36 WARN FileStreamSinkLog: Compacting took 24473 ms (load: 
14185 ms, write: 10288 ms) for compact batch 21359
    20/02/20 19:27:39 WARN FileStreamSinkLog: Loaded 1068000 entries (397985432 
bytes in memory), and wrote 1068000 entries for compact batch 21359
    20/02/20 19:29:52 WARN FileStreamSourceLog: Compacting took 3777 ms (load: 
1524 ms, write: 2253 ms) for compact batch 21369
    20/02/20 19:29:52 WARN FileStreamSourceLog: Loaded 229477 entries (68970112 
bytes in memory), and wrote 229477 entries for compact batch 21369
    20/02/20 19:30:17 WARN FileStreamSinkLog: Compacting took 24183 ms (load: 
12992 ms, write: 11191 ms) for compact batch 21369
    20/02/20 19:30:20 WARN FileStreamSinkLog: Loaded 1068500 entries (398171880 
bytes in memory), and wrote 1068500 entries for compact batch 21369
    ...
    ```
    
    ![Screen Shot 2020-02-21 at 12 34 22 
PM](https://user-images.githubusercontent.com/1317309/75002142-c6830100-54a6-11ea-8da6-17afb056653b.png)
    
    This messages are explaining why the operation duration peaks per every 10 
batches which is compact interval. Latency from addBatch heavily increases in 
each peak which DOES NOT mean it takes more time to write outputs, but we have 
no idea if such message is not presented.
    
    NOTE: The output may be a bit different from the code, as it may be changed 
a bit during review phase.
    
    Closes #27557 from HeartSaVioR/SPARK-30804.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
    (cherry picked from commit 39bc50dbf8ca835389f63b47baca129e088c5a19)
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 .../streaming/CompactibleFileStreamLog.scala       | 39 +++++++++++++++++-----
 1 file changed, 30 insertions(+), 9 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index 905bce4..10bcfe6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -28,6 +28,7 @@ import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{SizeEstimator, Utils}
 
 /**
  * An abstract class for compactible metadata logs. It will write one log file 
for each batch.
@@ -177,16 +178,35 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : 
ClassTag](
    * corresponding `batchId` file. It will delete expired files as well if 
enabled.
    */
   private def compact(batchId: Long, logs: Array[T]): Boolean = {
-    val validBatches = getValidBatchesBeforeCompactionBatch(batchId, 
compactInterval)
-    val allLogs = validBatches.flatMap { id =>
-      super.get(id).getOrElse {
-        throw new IllegalStateException(
-          s"${batchIdToPath(id)} doesn't exist when compacting batch $batchId 
" +
-            s"(compactInterval: $compactInterval)")
-      }
-    } ++ logs
+    val (allLogs, loadElapsedMs) = Utils.timeTakenMs {
+      val validBatches = getValidBatchesBeforeCompactionBatch(batchId, 
compactInterval)
+      validBatches.flatMap { id =>
+        super.get(id).getOrElse {
+          throw new IllegalStateException(
+            s"${batchIdToPath(id)} doesn't exist when compacting batch 
$batchId " +
+              s"(compactInterval: $compactInterval)")
+        }
+      } ++ logs
+    }
+    val compactedLogs = compactLogs(allLogs)
+
     // Return false as there is another writer.
-    super.add(batchId, compactLogs(allLogs).toArray)
+    val (writeSucceed, writeElapsedMs) = Utils.timeTakenMs {
+      super.add(batchId, compactedLogs.toArray)
+    }
+
+    val elapsedMs = loadElapsedMs + writeElapsedMs
+    if (elapsedMs >= COMPACT_LATENCY_WARN_THRESHOLD_MS) {
+      logWarning(s"Compacting took $elapsedMs ms (load: $loadElapsedMs ms," +
+        s" write: $writeElapsedMs ms) for compact batch $batchId")
+      logWarning(s"Loaded ${allLogs.size} entries (estimated 
${SizeEstimator.estimate(allLogs)} " +
+        s"bytes in memory), and wrote ${compactedLogs.size} entries for 
compact batch $batchId")
+    } else {
+      logDebug(s"Compacting took $elapsedMs ms (load: $loadElapsedMs ms," +
+        s" write: $writeElapsedMs ms) for compact batch $batchId")
+    }
+
+    writeSucceed
   }
 
   /**
@@ -268,6 +288,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : 
ClassTag](
 
 object CompactibleFileStreamLog {
   val COMPACT_FILE_SUFFIX = ".compact"
+  val COMPACT_LATENCY_WARN_THRESHOLD_MS = 2000
 
   def getBatchIdFromFileName(fileName: String): Long = {
     fileName.stripSuffix(COMPACT_FILE_SUFFIX).toLong


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to