This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 508b1d81fab [SPARK-39225][CORE] Support 
`spark.history.fs.update.batchSize`
508b1d81fab is described below

commit 508b1d81fab3920efed582db4c9b634490b2f26b
Author: Hai Tao <[email protected]>
AuthorDate: Thu May 19 13:09:17 2022 -0700

    [SPARK-39225][CORE] Support `spark.history.fs.update.batchSize`
    
    ### What changes were proposed in this pull request?
    - Add a spark property, `spark.history.fs.update.batchSize`(default 1000), 
which specifies the batch size for updating new eventlog files.
    - For each eventlog scan/parse process, only a max of 
`spark.history.fs.update.batchSize` jobs are scanned/parsed. This allows the 
scan process ends within a reasonable time, so new eventlog files(appeared 
after the scan process starts) can be scanned sooner.
    
    ### Why are the changes needed?
    
    Current Spark History Server suffers when there are a large number of 
eventlog files under eventLog.dir: when a SHS starts, the initial scan may take 
a long time, and new eventlog files would not be scanned/parsed until the 
initial scan completes.
    
    For example, if the initial scan takes 1-2 days(this is not uncommon in 
large environments), the newly finished spark jobs would not show up in SHS 
since their eventlog files are not scanned/parsed until the initial scan 
process finishes. This would result in a 1-2 days SHS malfunctioning since the 
newly finished spark jobs are most likely to be queried by users.
    
    This PR adds a limit for each scan process to 
`spark.history.fs.update.batchSize`, so that each scan can finish within a 
reasonable time(default 1000 jobs can usually be scanned in ~10-15 mins but 
this can vary depending on the sizes of the eventlog files). This will prevent 
the long initial scan process from blocking the newly appeared eventlog files 
be scanned.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Tested locally:
    
    - Tested with unit tests in the core module and all passed.
    - Placed 10 eventlog files under `eventLog.dir`, and set 
`spark.history.fs.update.batchSize` to 2. When the SHS started, observed that 
the scan process parsed 2 files at each time(starting from the files with the 
latest modification time).
    - Added a new eventlog file after the first scan started, and observed that 
the 2nd scan process picked up and parsed the new eventlog file before parsing 
other remaining (older) eventlog files.
    
    Closes #36597 from hai-tao-1/updateBatchSize.
    
    Authored-by: Hai Tao <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/deploy/history/FsHistoryProvider.scala   | 33 +++++++++++---
 .../org/apache/spark/internal/config/History.scala | 10 +++++
 .../deploy/history/FsHistoryProviderSuite.scala    | 50 ++++++++++++++++++++++
 3 files changed, 86 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 01b9e7952b1..a2b162468de 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -472,8 +472,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
    * Builds the application list based on the current contents of the log 
directory.
    * Tries to reuse as much of the data already in memory as possible, by not 
reading
    * applications that haven't been updated since last time the logs were 
checked.
+   * Only a max of UPDATE_BATCHSIZE jobs are processed in each cycle, to 
prevent the process
+   * from running for too long which blocks updating newly appeared eventlog 
files.
    */
   private[history] def checkForLogs(): Unit = {
+    var count: Int = 0
     try {
       val newLastScanTime = clock.getTimeMillis()
       logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
@@ -494,6 +497,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
           }
         }
         .flatMap { entry => EventLogFileReader(fs, entry) }
+        .filter { reader =>
+          try {
+            reader.modificationTime
+            true
+          } catch {
+            case e: IllegalArgumentException =>
+              logInfo("Exception in getting modificationTime of "
+                + reader.rootPath.getName + ". " + e.toString)
+              false
+          }
+        }
+        .sortWith { case (entry1, entry2) =>
+          entry1.modificationTime > entry2.modificationTime
+        }
         .filter { reader =>
           try {
             val info = listing.read(classOf[LogInfo], 
reader.rootPath.toString())
@@ -551,10 +568,15 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
               // to parse it. This will allow the cleaner code to detect the 
file as stale later on
               // if it was not possible to parse it.
               try {
-                listing.write(LogInfo(reader.rootPath.toString(), 
newLastScanTime,
-                  LogType.EventLogs, None, None, reader.fileSizeForLastIndex, 
reader.lastIndex,
-                  None, reader.completed))
-                reader.fileSizeForLastIndex > 0
+                if (count < conf.get(UPDATE_BATCHSIZE)) {
+                  listing.write(LogInfo(reader.rootPath.toString(), 
newLastScanTime,
+                    LogType.EventLogs, None, None, 
reader.fileSizeForLastIndex, reader.lastIndex,
+                    None, reader.completed))
+                  count = count + 1
+                  reader.fileSizeForLastIndex > 0
+                } else {
+                  false
+                }
               } catch {
                 case _: FileNotFoundException => false
                 case NonFatal(e) =>
@@ -567,9 +589,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
               false
           }
         }
-        .sortWith { case (entry1, entry2) =>
-          entry1.modificationTime > entry2.modificationTime
-        }
 
       if (updated.nonEmpty) {
         logDebug(s"New/updated attempts found: ${updated.size} 
${updated.map(_.rootPath)}")
diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala 
b/core/src/main/scala/org/apache/spark/internal/config/History.scala
index 49976842f2d..a35bc944af1 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/History.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala
@@ -41,6 +41,16 @@ private[spark] object History {
     .timeConf(TimeUnit.SECONDS)
     .createWithDefaultString("10s")
 
+  val UPDATE_BATCHSIZE = ConfigBuilder("spark.history.fs.update.batchSize")
+    .doc("Specifies the batch size for updating new eventlog files. " +
+      "This controls each scan process to be completed within a reasonable 
time, and such " +
+      "prevent the initial scan from running too long and blocking new 
eventlog files to " +
+      "be scanned in time in large environments.")
+    .version("3.4.0")
+    .intConf
+    .checkValue(v => v > 0, "The update batchSize should be a positive 
integer.")
+    .createWithDefault(Int.MaxValue)
+
   val CLEANER_ENABLED = ConfigBuilder("spark.history.fs.cleaner.enabled")
     .version("1.4.0")
     .booleanConf
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 78abbe93b79..8589e948fd1 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -1529,6 +1529,56 @@ abstract class FsHistoryProviderSuite extends 
SparkFunSuite with Matchers with L
     }
   }
 
+  test("SPARK-39225: Support spark.history.fs.update.batchSize") {
+    withTempDir { dir =>
+      val conf = createTestConf(true)
+      conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
+      conf.set(UPDATE_BATCHSIZE, 1)
+      val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
+      val provider = new FsHistoryProvider(conf)
+
+      // Create 1st application
+      val writer1 = new RollingEventLogFilesWriter("app1", None, dir.toURI, 
conf, hadoopConf)
+      writer1.start()
+      writeEventsToRollingWriter(writer1, Seq(
+        SparkListenerApplicationStart("app1", Some("app1"), 0, "user", None),
+        SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+      writer1.stop()
+
+      // Create 2nd application
+      val writer2 = new RollingEventLogFilesWriter("app2", None, dir.toURI, 
conf, hadoopConf)
+      writer2.start()
+      writeEventsToRollingWriter(writer2, Seq(
+        SparkListenerApplicationStart("app2", Some("app2"), 0, "user", None),
+        SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+      writer2.stop()
+
+      // The 1st checkForLogs should scan/update app2 only since it is newer 
than app1
+      provider.checkForLogs()
+      assert(provider.getListing.length === 1)
+      assert(dir.listFiles().size === 2)
+      assert(provider.getListing().map(e => e.id).contains("app2"))
+      assert(!provider.getListing().map(e => e.id).contains("app1"))
+
+      // Create 3rd application
+      val writer3 = new RollingEventLogFilesWriter("app3", None, dir.toURI, 
conf, hadoopConf)
+      writer3.start()
+      writeEventsToRollingWriter(writer3, Seq(
+        SparkListenerApplicationStart("app3", Some("app3"), 0, "user", None),
+        SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+      writer3.stop()
+
+      // The 2nd checkForLogs should scan/update app3 only since it is newer 
than app1
+      provider.checkForLogs()
+      assert(provider.getListing.length === 2)
+      assert(dir.listFiles().size === 3)
+      assert(provider.getListing().map(e => e.id).contains("app3"))
+      assert(!provider.getListing().map(e => e.id).contains("app1"))
+
+      provider.stop()
+    }
+  }
+
   test("SPARK-36354: EventLogFileReader should skip rolling event log 
directories with no logs") {
     withTempDir { dir =>
       val conf = createTestConf(true)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to