Repository: spark
Updated Branches:
  refs/heads/master 9b98d9166 -> 46d2d2c74


[SPARK-24787][CORE] Revert hsync in EventLoggingListener and make 
FsHistoryProvider to read lastBlockBeingWritten data for logs

## What changes were proposed in this pull request?

`hsync` has been added as part of SPARK-19531 to get the latest data in the 
history sever ui, but that is causing the performance overhead and also leading 
to drop many history log events. `hsync` uses the force `FileChannel.force` to 
sync the data to the disk and happens for the data pipeline, it is costly 
operation and making the application to face overhead and drop the events.

I think getting the latest data in history server can be done in different way 
(no impact to application while writing events), there is an api 
`DFSInputStream.getFileLength()` which gives the file length including the 
`lastBlockBeingWrittenLength`(different from `FileStatus.getLen()`), this api 
can be used when the file status length and previously cached length are equal 
to verify whether any new data has been written or not, if there is any update 
in data length then the history server can update the in progress history log. 
And also I made this change as configurable with the default value false, and 
can be enabled for history server if users want to see the updated data in ui.

## How was this patch tested?

Added new test and verified manually, with the added conf 
`spark.history.fs.inProgressAbsoluteLengthCheck.enabled=true`, history server 
is reading the logs including the last block data which is being written and 
updating the Web UI with the latest data.

Closes #22752 from devaraj-kavali/SPARK-24787.

Authored-by: Devaraj K <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46d2d2c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46d2d2c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46d2d2c7

Branch: refs/heads/master
Commit: 46d2d2c74d9aaf30e158aeda58a189f6c8e48b9c
Parents: 9b98d91
Author: Devaraj K <[email protected]>
Authored: Thu Oct 25 13:16:08 2018 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Thu Oct 25 13:16:08 2018 -0700

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      | 22 ++++++++++--
 .../spark/scheduler/EventLoggingListener.scala  |  8 +----
 .../deploy/history/FsHistoryProviderSuite.scala | 37 ++++++++++++++++++--
 3 files changed, 56 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/46d2d2c7/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index c23a659..c4517d3 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -34,7 +34,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore
 import com.google.common.io.ByteStreams
 import com.google.common.util.concurrent.MoreExecutors
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
-import org.apache.hadoop.hdfs.DistributedFileSystem
+import org.apache.hadoop.hdfs.{DFSInputStream, DistributedFileSystem}
 import org.apache.hadoop.hdfs.protocol.HdfsConstants
 import org.apache.hadoop.security.AccessControlException
 import org.fusesource.leveldbjni.internal.NativeDB
@@ -449,7 +449,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
               listing.write(info.copy(lastProcessed = newLastScanTime, 
fileSize = entry.getLen()))
             }
 
-            if (info.fileSize < entry.getLen()) {
+            if (shouldReloadLog(info, entry)) {
               if (info.appId.isDefined && fastInProgressParsing) {
                 // When fast in-progress parsing is on, we don't need to 
re-parse when the
                 // size changes, but we do need to invalidate any existing UIs.
@@ -541,6 +541,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     }
   }
 
+  private[history] def shouldReloadLog(info: LogInfo, entry: FileStatus): 
Boolean = {
+    var result = info.fileSize < entry.getLen
+    if (!result && info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
+      try {
+        result = Utils.tryWithResource(fs.open(entry.getPath)) { in =>
+          in.getWrappedStream match {
+            case dfsIn: DFSInputStream => info.fileSize < dfsIn.getFileLength
+            case _ => false
+          }
+        }
+      } catch {
+        case e: Exception =>
+          logDebug(s"Failed to check the length for the file : 
${info.logPath}", e)
+      }
+    }
+    result
+  }
+
   private def cleanAppData(appId: String, attemptId: Option[String], logPath: 
String): Unit = {
     try {
       val app = load(appId)

http://git-wip-us.apache.org/repos/asf/spark/blob/46d2d2c7/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 1629e17..f89fcd1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -20,7 +20,6 @@ package org.apache.spark.scheduler
 import java.io._
 import java.net.URI
 import java.nio.charset.StandardCharsets
-import java.util.EnumSet
 import java.util.Locale
 
 import scala.collection.mutable.{ArrayBuffer, Map}
@@ -28,8 +27,6 @@ import scala.collection.mutable.{ArrayBuffer, Map}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
 import org.apache.hadoop.fs.permission.FsPermission
-import org.apache.hadoop.hdfs.DFSOutputStream
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag
 import org.json4s.JsonAST.JValue
 import org.json4s.jackson.JsonMethods._
 
@@ -149,10 +146,7 @@ private[spark] class EventLoggingListener(
     // scalastyle:on println
     if (flushLogger) {
       writer.foreach(_.flush())
-      hadoopDataStream.foreach(ds => ds.getWrappedStream match {
-        case wrapped: DFSOutputStream => 
wrapped.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH))
-        case _ => ds.hflush()
-      })
+      hadoopDataStream.foreach(_.hflush())
     }
     if (testing) {
       loggedEvents += eventJson

http://git-wip-us.apache.org/repos/asf/spark/blob/46d2d2c7/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 444e8d6..6a761d4 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -27,8 +27,8 @@ import scala.concurrent.duration._
 import scala.language.postfixOps
 
 import com.google.common.io.{ByteStreams, Files}
-import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.hdfs.DistributedFileSystem
+import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataInputStream, Path}
+import org.apache.hadoop.hdfs.{DFSInputStream, DistributedFileSystem}
 import org.apache.hadoop.security.AccessControlException
 import org.json4s.jackson.JsonMethods._
 import org.mockito.ArgumentMatcher
@@ -856,6 +856,39 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     assert(!mockedProvider.isBlacklisted(accessDeniedPath))
   }
 
+  test("check in-progress event logs absolute length") {
+    val path = new Path("testapp.inprogress")
+    val provider = new FsHistoryProvider(createTestConf())
+    val mockedProvider = spy(provider)
+    val mockedFs = mock(classOf[FileSystem])
+    val in = mock(classOf[FSDataInputStream])
+    val dfsIn = mock(classOf[DFSInputStream])
+    when(mockedProvider.fs).thenReturn(mockedFs)
+    when(mockedFs.open(path)).thenReturn(in)
+    when(in.getWrappedStream).thenReturn(dfsIn)
+    when(dfsIn.getFileLength).thenReturn(200)
+    // FileStatus.getLen is more than logInfo fileSize
+    var fileStatus = new FileStatus(200, false, 0, 0, 0, path)
+    var logInfo = new LogInfo(path.toString, 0, Some("appId"), 
Some("attemptId"), 100)
+    assert(mockedProvider.shouldReloadLog(logInfo, fileStatus))
+
+    fileStatus = new FileStatus()
+    fileStatus.setPath(path)
+    // DFSInputStream.getFileLength is more than logInfo fileSize
+    logInfo = new LogInfo(path.toString, 0, Some("appId"), Some("attemptId"), 
100)
+    assert(mockedProvider.shouldReloadLog(logInfo, fileStatus))
+    // DFSInputStream.getFileLength is equal to logInfo fileSize
+    logInfo = new LogInfo(path.toString, 0, Some("appId"), Some("attemptId"), 
200)
+    assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus))
+    // in.getWrappedStream returns other than DFSInputStream
+    val bin = mock(classOf[BufferedInputStream])
+    when(in.getWrappedStream).thenReturn(bin)
+    assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus))
+    // fs.open throws exception
+    when(mockedFs.open(path)).thenThrow(new IOException("Throwing 
intentionally"))
+    assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus))
+  }
+
   /**
    * Asks the provider to check for logs and calls a function to perform 
checks on the updated
    * app list. Example:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to