Repository: spark Updated Branches: refs/heads/master cb19880cd -> da9f067a1
[SPARK-19531] Send UPDATE_LENGTH for Spark History service ## What changes were proposed in this pull request? During writing to the .inprogress file (stored on the HDFS) Hadoop doesn't update file length until close and therefor Spark's history server can't detect any changes. We have to send UPDATE_LENGTH manually. Author: Oleg Danilov <[email protected]> Closes #16924 from dosoft/SPARK-19531. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/da9f067a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/da9f067a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/da9f067a Branch: refs/heads/master Commit: da9f067a1eae5e92a33a6e688efcf42b35a5f9da Parents: cb19880 Author: Oleg Danilov <[email protected]> Authored: Thu Jul 20 09:38:49 2017 -0700 Committer: Marcelo Vanzin <[email protected]> Committed: Thu Jul 20 09:38:49 2017 -0700 ---------------------------------------------------------------------- .../org/apache/spark/scheduler/EventLoggingListener.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/da9f067a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 35690b2..00ab2a3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler import java.io._ import java.net.URI import java.nio.charset.StandardCharsets +import java.util.EnumSet import java.util.Locale import scala.collection.mutable @@ -28,6 +29,8 @@ import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} import org.apache.hadoop.fs.permission.FsPermission +import org.apache.hadoop.hdfs.DFSOutputStream +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods._ @@ -138,7 +141,10 @@ private[spark] class EventLoggingListener( // scalastyle:on println if (flushLogger) { writer.foreach(_.flush()) - hadoopDataStream.foreach(_.hflush()) + hadoopDataStream.foreach(ds => ds.getWrappedStream match { + case wrapped: DFSOutputStream => wrapped.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)) + case _ => ds.hflush() + }) } if (testing) { loggedEvents += eventJson --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
