Repository: spark Updated Branches: refs/heads/master 9b98d9166 -> 46d2d2c74
[SPARK-24787][CORE] Revert hsync in EventLoggingListener and make FsHistoryProvider to read lastBlockBeingWritten data for logs ## What changes were proposed in this pull request? `hsync` has been added as part of SPARK-19531 to get the latest data in the history sever ui, but that is causing the performance overhead and also leading to drop many history log events. `hsync` uses the force `FileChannel.force` to sync the data to the disk and happens for the data pipeline, it is costly operation and making the application to face overhead and drop the events. I think getting the latest data in history server can be done in different way (no impact to application while writing events), there is an api `DFSInputStream.getFileLength()` which gives the file length including the `lastBlockBeingWrittenLength`(different from `FileStatus.getLen()`), this api can be used when the file status length and previously cached length are equal to verify whether any new data has been written or not, if there is any update in data length then the history server can update the in progress history log. And also I made this change as configurable with the default value false, and can be enabled for history server if users want to see the updated data in ui. ## How was this patch tested? Added new test and verified manually, with the added conf `spark.history.fs.inProgressAbsoluteLengthCheck.enabled=true`, history server is reading the logs including the last block data which is being written and updating the Web UI with the latest data. Closes #22752 from devaraj-kavali/SPARK-24787. Authored-by: Devaraj K <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46d2d2c7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46d2d2c7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46d2d2c7 Branch: refs/heads/master Commit: 46d2d2c74d9aaf30e158aeda58a189f6c8e48b9c Parents: 9b98d91 Author: Devaraj K <[email protected]> Authored: Thu Oct 25 13:16:08 2018 -0700 Committer: Marcelo Vanzin <[email protected]> Committed: Thu Oct 25 13:16:08 2018 -0700 ---------------------------------------------------------------------- .../deploy/history/FsHistoryProvider.scala | 22 ++++++++++-- .../spark/scheduler/EventLoggingListener.scala | 8 +---- .../deploy/history/FsHistoryProviderSuite.scala | 37 ++++++++++++++++++-- 3 files changed, 56 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/46d2d2c7/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index c23a659..c4517d3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -34,7 +34,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore import com.google.common.io.ByteStreams import com.google.common.util.concurrent.MoreExecutors import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} -import org.apache.hadoop.hdfs.DistributedFileSystem +import org.apache.hadoop.hdfs.{DFSInputStream, DistributedFileSystem} import org.apache.hadoop.hdfs.protocol.HdfsConstants import org.apache.hadoop.security.AccessControlException import org.fusesource.leveldbjni.internal.NativeDB @@ -449,7 +449,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) listing.write(info.copy(lastProcessed = newLastScanTime, fileSize = entry.getLen())) } - if (info.fileSize < entry.getLen()) { + if (shouldReloadLog(info, entry)) { if (info.appId.isDefined && fastInProgressParsing) { // When fast in-progress parsing is on, we don't need to re-parse when the // size changes, but we do need to invalidate any existing UIs. @@ -541,6 +541,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } + private[history] def shouldReloadLog(info: LogInfo, entry: FileStatus): Boolean = { + var result = info.fileSize < entry.getLen + if (!result && info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) { + try { + result = Utils.tryWithResource(fs.open(entry.getPath)) { in => + in.getWrappedStream match { + case dfsIn: DFSInputStream => info.fileSize < dfsIn.getFileLength + case _ => false + } + } + } catch { + case e: Exception => + logDebug(s"Failed to check the length for the file : ${info.logPath}", e) + } + } + result + } + private def cleanAppData(appId: String, attemptId: Option[String], logPath: String): Unit = { try { val app = load(appId) http://git-wip-us.apache.org/repos/asf/spark/blob/46d2d2c7/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 1629e17..f89fcd1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -20,7 +20,6 @@ package org.apache.spark.scheduler import java.io._ import java.net.URI import java.nio.charset.StandardCharsets -import java.util.EnumSet import java.util.Locale import scala.collection.mutable.{ArrayBuffer, Map} @@ -28,8 +27,6 @@ import scala.collection.mutable.{ArrayBuffer, Map} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} import org.apache.hadoop.fs.permission.FsPermission -import org.apache.hadoop.hdfs.DFSOutputStream -import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods._ @@ -149,10 +146,7 @@ private[spark] class EventLoggingListener( // scalastyle:on println if (flushLogger) { writer.foreach(_.flush()) - hadoopDataStream.foreach(ds => ds.getWrappedStream match { - case wrapped: DFSOutputStream => wrapped.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)) - case _ => ds.hflush() - }) + hadoopDataStream.foreach(_.hflush()) } if (testing) { loggedEvents += eventJson http://git-wip-us.apache.org/repos/asf/spark/blob/46d2d2c7/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 444e8d6..6a761d4 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -27,8 +27,8 @@ import scala.concurrent.duration._ import scala.language.postfixOps import com.google.common.io.{ByteStreams, Files} -import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.hdfs.DistributedFileSystem +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataInputStream, Path} +import org.apache.hadoop.hdfs.{DFSInputStream, DistributedFileSystem} import org.apache.hadoop.security.AccessControlException import org.json4s.jackson.JsonMethods._ import org.mockito.ArgumentMatcher @@ -856,6 +856,39 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc assert(!mockedProvider.isBlacklisted(accessDeniedPath)) } + test("check in-progress event logs absolute length") { + val path = new Path("testapp.inprogress") + val provider = new FsHistoryProvider(createTestConf()) + val mockedProvider = spy(provider) + val mockedFs = mock(classOf[FileSystem]) + val in = mock(classOf[FSDataInputStream]) + val dfsIn = mock(classOf[DFSInputStream]) + when(mockedProvider.fs).thenReturn(mockedFs) + when(mockedFs.open(path)).thenReturn(in) + when(in.getWrappedStream).thenReturn(dfsIn) + when(dfsIn.getFileLength).thenReturn(200) + // FileStatus.getLen is more than logInfo fileSize + var fileStatus = new FileStatus(200, false, 0, 0, 0, path) + var logInfo = new LogInfo(path.toString, 0, Some("appId"), Some("attemptId"), 100) + assert(mockedProvider.shouldReloadLog(logInfo, fileStatus)) + + fileStatus = new FileStatus() + fileStatus.setPath(path) + // DFSInputStream.getFileLength is more than logInfo fileSize + logInfo = new LogInfo(path.toString, 0, Some("appId"), Some("attemptId"), 100) + assert(mockedProvider.shouldReloadLog(logInfo, fileStatus)) + // DFSInputStream.getFileLength is equal to logInfo fileSize + logInfo = new LogInfo(path.toString, 0, Some("appId"), Some("attemptId"), 200) + assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus)) + // in.getWrappedStream returns other than DFSInputStream + val bin = mock(classOf[BufferedInputStream]) + when(in.getWrappedStream).thenReturn(bin) + assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus)) + // fs.open throws exception + when(mockedFs.open(path)).thenThrow(new IOException("Throwing intentionally")) + assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus)) + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
