Repository: spark
Updated Branches:
  refs/heads/master e4cb42ad8 -> af3b81607


[SPARK-25855][CORE] Don't use erasure coding for event logs by default

## What changes were proposed in this pull request?

This turns off hdfs erasure coding by default for event logs, regardless of 
filesystem defaults.  Because this requires apis only available in hadoop 3, 
this uses reflection.  EC isn't a very good choice for event logs, as hflush() 
is a no-op, and so updates to the file are not visible for a long time.  This 
can still be configured by setting "spark.eventLog.allowErasureCoding=true", 
which will use filesystem defaults.

## How was this patch tested?

deployed a cluster with the changes with HDFS EC on.  By default, event logs 
didn't use EC, but configuration still would allow EC.  Also tried writing to 
the local fs (which doesn't support EC at all) and things worked fine.

Closes #22881 from squito/SPARK-25855.

Authored-by: Imran Rashid <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af3b8160
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af3b8160
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af3b8160

Branch: refs/heads/master
Commit: af3b8160704b27dd8ed2b95b61edeec6968685be
Parents: e4cb42a
Author: Imran Rashid <[email protected]>
Authored: Wed Oct 31 10:52:51 2018 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Wed Oct 31 10:52:51 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   | 32 +++++++++++++++++++-
 .../apache/spark/internal/config/package.scala  |  5 +++
 .../spark/scheduler/EventLoggingListener.scala  |  7 ++++-
 docs/configuration.md                           | 11 +++++++
 4 files changed, 53 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/af3b8160/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 4cc0063..78a7cf6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.deploy
 
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, 
DataOutputStream, File, IOException}
+import java.lang.reflect.Method
 import java.security.PrivilegedExceptionAction
 import java.text.DateFormat
 import java.util.{Arrays, Comparator, Date, Locale}
@@ -30,7 +31,7 @@ import scala.util.control.NonFatal
 
 import com.google.common.primitives.Longs
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
+import org.apache.hadoop.fs._
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 import org.apache.hadoop.security.token.{Token, TokenIdentifier}
@@ -471,4 +472,33 @@ object SparkHadoopUtil {
       hadoopConf.set(key.substring("spark.hadoop.".length), value)
     }
   }
+
+  // scalastyle:off line.size.limit
+  /**
+   * Create a path that uses replication instead of erasure coding (ec), 
regardless of the default
+   * configuration in hdfs for the given path.  This can be helpful as hdfs ec 
doesn't support
+   * hflush(), hsync(), or append()
+   * 
https://hadoop.apache.org/docs/r3.0.0/hadoop-project-dist/hadoop-hdfs/HDFSErasureCoding.html#Limitations
+   */
+  // scalastyle:on line.size.limit
+  def createNonECFile(fs: FileSystem, path: Path): FSDataOutputStream = {
+    try {
+      // Use reflection as this uses apis only avialable in hadoop 3
+      val builderMethod = fs.getClass().getMethod("createFile", classOf[Path])
+      val builder = builderMethod.invoke(fs, path)
+      val builderCls = builder.getClass()
+      // this may throw a NoSuchMethodException if the path is not on hdfs
+      val replicateMethod = builderCls.getMethod("replicate")
+      val buildMethod = builderCls.getMethod("build")
+      val b2 = replicateMethod.invoke(builder)
+      buildMethod.invoke(b2).asInstanceOf[FSDataOutputStream]
+    } catch {
+      case  _: NoSuchMethodException =>
+        // No createFile() method, we're using an older hdfs client, which 
doesn't give us control
+        // over EC vs. replication.  Older hdfs doesn't have EC anyway, so 
just create a file with
+        // old apis.
+        fs.create(path)
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/af3b8160/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index e8b1d88..356cf9e 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -58,6 +58,11 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val EVENT_LOG_ALLOW_EC =
+    ConfigBuilder("spark.eventLog.allowErasureCoding")
+      .booleanConf
+      .createWithDefault(false)
+
   private[spark] val EVENT_LOG_TESTING =
     ConfigBuilder("spark.eventLog.testing")
       .internal()

http://git-wip-us.apache.org/repos/asf/spark/blob/af3b8160/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index f89fcd1..788b23d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -67,6 +67,7 @@ private[spark] class EventLoggingListener(
   private val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
   private val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
   private val shouldLogBlockUpdates = sparkConf.get(EVENT_LOG_BLOCK_UPDATES)
+  private val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC)
   private val shouldLogStageExecutorMetrics = 
sparkConf.get(EVENT_LOG_STAGE_EXECUTOR_METRICS)
   private val testing = sparkConf.get(EVENT_LOG_TESTING)
   private val outputBufferSize = 
sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
@@ -119,7 +120,11 @@ private[spark] class EventLoggingListener(
       if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == 
"file") {
         new FileOutputStream(uri.getPath)
       } else {
-        hadoopDataStream = Some(fileSystem.create(path))
+        hadoopDataStream = Some(if (shouldAllowECLogs) {
+          fileSystem.create(path)
+        } else {
+          SparkHadoopUtil.createNonECFile(fileSystem, path)
+        })
         hadoopDataStream.get
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/af3b8160/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 432b4cd..8cb0ed1 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -762,6 +762,17 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.eventLog.allowErasureCoding</code></td>
+  <td>false</td>
+  <td>
+    Whether to allow event logs to use erasure coding, or turn erasure coding 
off, regardless of
+    filesystem defaults.  On HDFS, erasure coded files will not update as 
quickly as regular
+    replicated files, so the application updates will take longer to appear in 
the History Server.
+    Note that even if this is true, Spark will still not force the file to use 
erasure coding, it
+    will simply use filesystem defaults.
+  </td>
+</tr>
+<tr>
   <td><code>spark.eventLog.dir</code></td>
   <td>file:///tmp/spark-events</td>
   <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to