Repository: spark Updated Branches: refs/heads/master e4cb42ad8 -> af3b81607
[SPARK-25855][CORE] Don't use erasure coding for event logs by default ## What changes were proposed in this pull request? This turns off hdfs erasure coding by default for event logs, regardless of filesystem defaults. Because this requires apis only available in hadoop 3, this uses reflection. EC isn't a very good choice for event logs, as hflush() is a no-op, and so updates to the file are not visible for a long time. This can still be configured by setting "spark.eventLog.allowErasureCoding=true", which will use filesystem defaults. ## How was this patch tested? deployed a cluster with the changes with HDFS EC on. By default, event logs didn't use EC, but configuration still would allow EC. Also tried writing to the local fs (which doesn't support EC at all) and things worked fine. Closes #22881 from squito/SPARK-25855. Authored-by: Imran Rashid <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af3b8160 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af3b8160 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af3b8160 Branch: refs/heads/master Commit: af3b8160704b27dd8ed2b95b61edeec6968685be Parents: e4cb42a Author: Imran Rashid <[email protected]> Authored: Wed Oct 31 10:52:51 2018 -0700 Committer: Marcelo Vanzin <[email protected]> Committed: Wed Oct 31 10:52:51 2018 -0700 ---------------------------------------------------------------------- .../apache/spark/deploy/SparkHadoopUtil.scala | 32 +++++++++++++++++++- .../apache/spark/internal/config/package.scala | 5 +++ .../spark/scheduler/EventLoggingListener.scala | 7 ++++- docs/configuration.md | 11 +++++++ 4 files changed, 53 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/af3b8160/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 4cc0063..78a7cf6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException} +import java.lang.reflect.Method import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} @@ -30,7 +31,7 @@ import scala.util.control.NonFatal import com.google.common.primitives.Longs import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} +import org.apache.hadoop.fs._ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.{Token, TokenIdentifier} @@ -471,4 +472,33 @@ object SparkHadoopUtil { hadoopConf.set(key.substring("spark.hadoop.".length), value) } } + + // scalastyle:off line.size.limit + /** + * Create a path that uses replication instead of erasure coding (ec), regardless of the default + * configuration in hdfs for the given path. This can be helpful as hdfs ec doesn't support + * hflush(), hsync(), or append() + * https://hadoop.apache.org/docs/r3.0.0/hadoop-project-dist/hadoop-hdfs/HDFSErasureCoding.html#Limitations + */ + // scalastyle:on line.size.limit + def createNonECFile(fs: FileSystem, path: Path): FSDataOutputStream = { + try { + // Use reflection as this uses apis only avialable in hadoop 3 + val builderMethod = fs.getClass().getMethod("createFile", classOf[Path]) + val builder = builderMethod.invoke(fs, path) + val builderCls = builder.getClass() + // this may throw a NoSuchMethodException if the path is not on hdfs + val replicateMethod = builderCls.getMethod("replicate") + val buildMethod = builderCls.getMethod("build") + val b2 = replicateMethod.invoke(builder) + buildMethod.invoke(b2).asInstanceOf[FSDataOutputStream] + } catch { + case _: NoSuchMethodException => + // No createFile() method, we're using an older hdfs client, which doesn't give us control + // over EC vs. replication. Older hdfs doesn't have EC anyway, so just create a file with + // old apis. + fs.create(path) + } + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/af3b8160/core/src/main/scala/org/apache/spark/internal/config/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e8b1d88..356cf9e 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -58,6 +58,11 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val EVENT_LOG_ALLOW_EC = + ConfigBuilder("spark.eventLog.allowErasureCoding") + .booleanConf + .createWithDefault(false) + private[spark] val EVENT_LOG_TESTING = ConfigBuilder("spark.eventLog.testing") .internal() http://git-wip-us.apache.org/repos/asf/spark/blob/af3b8160/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index f89fcd1..788b23d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -67,6 +67,7 @@ private[spark] class EventLoggingListener( private val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) private val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) private val shouldLogBlockUpdates = sparkConf.get(EVENT_LOG_BLOCK_UPDATES) + private val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC) private val shouldLogStageExecutorMetrics = sparkConf.get(EVENT_LOG_STAGE_EXECUTOR_METRICS) private val testing = sparkConf.get(EVENT_LOG_TESTING) private val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt @@ -119,7 +120,11 @@ private[spark] class EventLoggingListener( if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { new FileOutputStream(uri.getPath) } else { - hadoopDataStream = Some(fileSystem.create(path)) + hadoopDataStream = Some(if (shouldAllowECLogs) { + fileSystem.create(path) + } else { + SparkHadoopUtil.createNonECFile(fileSystem, path) + }) hadoopDataStream.get } http://git-wip-us.apache.org/repos/asf/spark/blob/af3b8160/docs/configuration.md ---------------------------------------------------------------------- diff --git a/docs/configuration.md b/docs/configuration.md index 432b4cd..8cb0ed1 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -762,6 +762,17 @@ Apart from these, the following properties are also available, and may be useful </td> </tr> <tr> + <td><code>spark.eventLog.allowErasureCoding</code></td> + <td>false</td> + <td> + Whether to allow event logs to use erasure coding, or turn erasure coding off, regardless of + filesystem defaults. On HDFS, erasure coded files will not update as quickly as regular + replicated files, so the application updates will take longer to appear in the History Server. + Note that even if this is true, Spark will still not force the file to use erasure coding, it + will simply use filesystem defaults. + </td> +</tr> +<tr> <td><code>spark.eventLog.dir</code></td> <td>file:///tmp/spark-events</td> <td> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
