This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 508b1d81fab [SPARK-39225][CORE] Support
`spark.history.fs.update.batchSize`
508b1d81fab is described below
commit 508b1d81fab3920efed582db4c9b634490b2f26b
Author: Hai Tao <[email protected]>
AuthorDate: Thu May 19 13:09:17 2022 -0700
[SPARK-39225][CORE] Support `spark.history.fs.update.batchSize`
### What changes were proposed in this pull request?
- Add a spark property, `spark.history.fs.update.batchSize`(default 1000),
which specifies the batch size for updating new eventlog files.
- For each eventlog scan/parse process, only a max of
`spark.history.fs.update.batchSize` jobs are scanned/parsed. This allows the
scan process ends within a reasonable time, so new eventlog files(appeared
after the scan process starts) can be scanned sooner.
### Why are the changes needed?
Current Spark History Server suffers when there are a large number of
eventlog files under eventLog.dir: when a SHS starts, the initial scan may take
a long time, and new eventlog files would not be scanned/parsed until the
initial scan completes.
For example, if the initial scan takes 1-2 days(this is not uncommon in
large environments), the newly finished spark jobs would not show up in SHS
since their eventlog files are not scanned/parsed until the initial scan
process finishes. This would result in a 1-2 days SHS malfunctioning since the
newly finished spark jobs are most likely to be queried by users.
This PR adds a limit for each scan process to
`spark.history.fs.update.batchSize`, so that each scan can finish within a
reasonable time(default 1000 jobs can usually be scanned in ~10-15 mins but
this can vary depending on the sizes of the eventlog files). This will prevent
the long initial scan process from blocking the newly appeared eventlog files
be scanned.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Tested locally:
- Tested with unit tests in the core module and all passed.
- Placed 10 eventlog files under `eventLog.dir`, and set
`spark.history.fs.update.batchSize` to 2. When the SHS started, observed that
the scan process parsed 2 files at each time(starting from the files with the
latest modification time).
- Added a new eventlog file after the first scan started, and observed that
the 2nd scan process picked up and parsed the new eventlog file before parsing
other remaining (older) eventlog files.
Closes #36597 from hai-tao-1/updateBatchSize.
Authored-by: Hai Tao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/deploy/history/FsHistoryProvider.scala | 33 +++++++++++---
.../org/apache/spark/internal/config/History.scala | 10 +++++
.../deploy/history/FsHistoryProviderSuite.scala | 50 ++++++++++++++++++++++
3 files changed, 86 insertions(+), 7 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 01b9e7952b1..a2b162468de 100644
---
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -472,8 +472,11 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
* Builds the application list based on the current contents of the log
directory.
* Tries to reuse as much of the data already in memory as possible, by not
reading
* applications that haven't been updated since last time the logs were
checked.
+ * Only a max of UPDATE_BATCHSIZE jobs are processed in each cycle, to
prevent the process
+ * from running for too long which blocks updating newly appeared eventlog
files.
*/
private[history] def checkForLogs(): Unit = {
+ var count: Int = 0
try {
val newLastScanTime = clock.getTimeMillis()
logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
@@ -494,6 +497,20 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
}
}
.flatMap { entry => EventLogFileReader(fs, entry) }
+ .filter { reader =>
+ try {
+ reader.modificationTime
+ true
+ } catch {
+ case e: IllegalArgumentException =>
+ logInfo("Exception in getting modificationTime of "
+ + reader.rootPath.getName + ". " + e.toString)
+ false
+ }
+ }
+ .sortWith { case (entry1, entry2) =>
+ entry1.modificationTime > entry2.modificationTime
+ }
.filter { reader =>
try {
val info = listing.read(classOf[LogInfo],
reader.rootPath.toString())
@@ -551,10 +568,15 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
// to parse it. This will allow the cleaner code to detect the
file as stale later on
// if it was not possible to parse it.
try {
- listing.write(LogInfo(reader.rootPath.toString(),
newLastScanTime,
- LogType.EventLogs, None, None, reader.fileSizeForLastIndex,
reader.lastIndex,
- None, reader.completed))
- reader.fileSizeForLastIndex > 0
+ if (count < conf.get(UPDATE_BATCHSIZE)) {
+ listing.write(LogInfo(reader.rootPath.toString(),
newLastScanTime,
+ LogType.EventLogs, None, None,
reader.fileSizeForLastIndex, reader.lastIndex,
+ None, reader.completed))
+ count = count + 1
+ reader.fileSizeForLastIndex > 0
+ } else {
+ false
+ }
} catch {
case _: FileNotFoundException => false
case NonFatal(e) =>
@@ -567,9 +589,6 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
false
}
}
- .sortWith { case (entry1, entry2) =>
- entry1.modificationTime > entry2.modificationTime
- }
if (updated.nonEmpty) {
logDebug(s"New/updated attempts found: ${updated.size}
${updated.map(_.rootPath)}")
diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala
b/core/src/main/scala/org/apache/spark/internal/config/History.scala
index 49976842f2d..a35bc944af1 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/History.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala
@@ -41,6 +41,16 @@ private[spark] object History {
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("10s")
+ val UPDATE_BATCHSIZE = ConfigBuilder("spark.history.fs.update.batchSize")
+ .doc("Specifies the batch size for updating new eventlog files. " +
+ "This controls each scan process to be completed within a reasonable
time, and such " +
+ "prevent the initial scan from running too long and blocking new
eventlog files to " +
+ "be scanned in time in large environments.")
+ .version("3.4.0")
+ .intConf
+ .checkValue(v => v > 0, "The update batchSize should be a positive
integer.")
+ .createWithDefault(Int.MaxValue)
+
val CLEANER_ENABLED = ConfigBuilder("spark.history.fs.cleaner.enabled")
.version("1.4.0")
.booleanConf
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 78abbe93b79..8589e948fd1 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -1529,6 +1529,56 @@ abstract class FsHistoryProviderSuite extends
SparkFunSuite with Matchers with L
}
}
+ test("SPARK-39225: Support spark.history.fs.update.batchSize") {
+ withTempDir { dir =>
+ val conf = createTestConf(true)
+ conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
+ conf.set(UPDATE_BATCHSIZE, 1)
+ val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
+ val provider = new FsHistoryProvider(conf)
+
+ // Create 1st application
+ val writer1 = new RollingEventLogFilesWriter("app1", None, dir.toURI,
conf, hadoopConf)
+ writer1.start()
+ writeEventsToRollingWriter(writer1, Seq(
+ SparkListenerApplicationStart("app1", Some("app1"), 0, "user", None),
+ SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+ writer1.stop()
+
+ // Create 2nd application
+ val writer2 = new RollingEventLogFilesWriter("app2", None, dir.toURI,
conf, hadoopConf)
+ writer2.start()
+ writeEventsToRollingWriter(writer2, Seq(
+ SparkListenerApplicationStart("app2", Some("app2"), 0, "user", None),
+ SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+ writer2.stop()
+
+ // The 1st checkForLogs should scan/update app2 only since it is newer
than app1
+ provider.checkForLogs()
+ assert(provider.getListing.length === 1)
+ assert(dir.listFiles().size === 2)
+ assert(provider.getListing().map(e => e.id).contains("app2"))
+ assert(!provider.getListing().map(e => e.id).contains("app1"))
+
+ // Create 3rd application
+ val writer3 = new RollingEventLogFilesWriter("app3", None, dir.toURI,
conf, hadoopConf)
+ writer3.start()
+ writeEventsToRollingWriter(writer3, Seq(
+ SparkListenerApplicationStart("app3", Some("app3"), 0, "user", None),
+ SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+ writer3.stop()
+
+ // The 2nd checkForLogs should scan/update app3 only since it is newer
than app1
+ provider.checkForLogs()
+ assert(provider.getListing.length === 2)
+ assert(dir.listFiles().size === 3)
+ assert(provider.getListing().map(e => e.id).contains("app3"))
+ assert(!provider.getListing().map(e => e.id).contains("app1"))
+
+ provider.stop()
+ }
+ }
+
test("SPARK-36354: EventLogFileReader should skip rolling event log
directories with no logs") {
withTempDir { dir =>
val conf = createTestConf(true)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]