This is an automated email from the ASF dual-hosted git repository.
prashant pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new f2b80c7 [SPARK-32221][K8S] Avoid possible errors due to incorrect
file size or type supplied in spark conf
f2b80c7 is described below
commit f2b80c78dfaed25bd444d71847c9483fbe4a4115
Author: Prashant Sharma <[email protected]>
AuthorDate: Wed Jan 6 14:55:40 2021 +0530
[SPARK-32221][K8S] Avoid possible errors due to incorrect file size or type
supplied in spark conf
### What changes were proposed in this pull request?
Skip files if they are binary or very large to fit the configMap's max size.
### Why are the changes needed?
Config map cannot hold binary files and there is also a limit on how much
data a configMap can hold.
This limit can be configured by the k8s cluster admin. This PR, skips such
files (with a warning) instead of failing with weird runtime errors.
If such files are not skipped, then it would result in mount errors or
encoding errors (if binary files are submitted).
### Does this PR introduce _any_ user-facing change?
yes, in simple words avoids possible errors due to negligence (for example,
placing a large file or a binary file in SPARK_CONF_DIR) and thus improves user
experience.
### How was this patch tested?
Added relevant tests and improved existing tests.
Closes #30472 from ScrapCodes/SPARK-32221/avoid-conf-propagate-errors.
Lead-authored-by: Prashant Sharma <[email protected]>
Co-authored-by: Prashant Sharma <[email protected]>
Signed-off-by: Prashant Sharma <[email protected]>
(cherry picked from commit f64dfa8727b785f333a0c10f5f7175ab51f22764)
Signed-off-by: Prashant Sharma <[email protected]>
---
.../scala/org/apache/spark/deploy/k8s/Config.scala | 8 +++
.../deploy/k8s/submit/KubernetesClientUtils.scala | 80 +++++++++++++++++-----
.../spark/deploy/k8s/submit/ClientSuite.scala | 21 ++++--
.../k8s/submit/KubernetesClientUtilsSuite.scala | 79 +++++++++++++++++++++
4 files changed, 164 insertions(+), 24 deletions(-)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 8232ed3..65ac3c9 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -91,6 +91,14 @@ private[spark] object Config extends Logging {
.toSequence
.createWithDefault(Nil)
+ val CONFIG_MAP_MAXSIZE =
+ ConfigBuilder("spark.kubernetes.configMap.maxSize")
+ .doc("Max size limit for a config map. This is configurable as per" +
+ " https://etcd.io/docs/v3.4.0/dev-guide/limit/ on k8s server end.")
+ .version("3.1.0")
+ .longConf
+ .createWithDefault(1572864) // 1.5 MiB
+
val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
"spark.kubernetes.authenticate.driver"
val KUBERNETES_AUTH_EXECUTOR_CONF_PREFIX =
"spark.kubernetes.authenticate.executor"
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
"spark.kubernetes.authenticate.driver.mounted"
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala
index 32f630f..4207077 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala
@@ -18,15 +18,17 @@
package org.apache.spark.deploy.k8s.submit
import java.io.{File, StringWriter}
+import java.nio.charset.MalformedInputException
import java.util.Properties
import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.io.{Codec, Source}
import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, KeyToPath}
import org.apache.spark.SparkConf
-import org.apache.spark.deploy.k8s.{Constants, KubernetesUtils}
+import org.apache.spark.deploy.k8s.{Config, Constants, KubernetesUtils}
import org.apache.spark.deploy.k8s.Constants.ENV_SPARK_CONF_DIR
import org.apache.spark.internal.Logging
@@ -54,8 +56,10 @@ private[spark] object KubernetesClientUtils extends Logging {
/**
* Build, file -> 'file's content' map of all the selected files in
SPARK_CONF_DIR.
*/
- def buildSparkConfDirFilesMap(configMapName: String,
- sparkConf: SparkConf, resolvedPropertiesMap: Map[String, String]):
Map[String, String] = {
+ def buildSparkConfDirFilesMap(
+ configMapName: String,
+ sparkConf: SparkConf,
+ resolvedPropertiesMap: Map[String, String]): Map[String, String] =
synchronized {
val loadedConfFilesMap =
KubernetesClientUtils.loadSparkConfDirFiles(sparkConf)
// Add resolved spark conf to the loaded configuration files map.
if (resolvedPropertiesMap.nonEmpty) {
@@ -90,29 +94,71 @@ private[spark] object KubernetesClientUtils extends Logging
{
.build()
}
- private def loadSparkConfDirFiles(conf: SparkConf): Map[String, String] = {
+ private def orderFilesBySize(confFiles: Seq[File]): Seq[File] = {
+ val fileToFileSizePairs = confFiles.map(f => (f, f.getName.length +
f.length()))
+ // sort first by name and then by length, so that during tests we have
consistent results.
+ fileToFileSizePairs.sortBy(f => f._1).sortBy(f => f._2).map(_._1)
+ }
+
+ // exposed for testing
+ private[submit] def loadSparkConfDirFiles(conf: SparkConf): Map[String,
String] = {
val confDir = Option(conf.getenv(ENV_SPARK_CONF_DIR)).orElse(
conf.getOption("spark.home").map(dir => s"$dir/conf"))
+ val maxSize = conf.get(Config.CONFIG_MAP_MAXSIZE)
if (confDir.isDefined) {
- val confFiles = listConfFiles(confDir.get)
- logInfo(s"Spark configuration files loaded from $confDir :
${confFiles.mkString(",")}")
- confFiles.map { file =>
- val source = Source.fromFile(file)(Codec.UTF8)
- val mapping = (file.getName -> source.mkString)
- source.close()
- mapping
- }.toMap
+ val confFiles: Seq[File] = listConfFiles(confDir.get, maxSize)
+ val orderedConfFiles = orderFilesBySize(confFiles)
+ var truncatedMapSize: Long = 0
+ val truncatedMap = mutable.HashMap[String, String]()
+ val skippedFiles = mutable.HashSet[String]()
+ var source: Source = Source.fromString("") // init with empty source.
+ for (file <- orderedConfFiles) {
+ try {
+ source = Source.fromFile(file)(Codec.UTF8)
+ val (fileName, fileContent) = file.getName -> source.mkString
+ if ((truncatedMapSize + fileName.length + fileContent.length) <
maxSize) {
+ truncatedMap.put(fileName, fileContent)
+ truncatedMapSize = truncatedMapSize + (fileName.length +
fileContent.length)
+ } else {
+ skippedFiles.add(fileName)
+ }
+ } catch {
+ case e: MalformedInputException =>
+ logWarning(
+ s"Unable to read a non UTF-8 encoded file
${file.getAbsolutePath}. Skipping...", e)
+ None
+ } finally {
+ source.close()
+ }
+ }
+ if (truncatedMap.nonEmpty) {
+ logInfo(s"Spark configuration files loaded from $confDir :" +
+ s" ${truncatedMap.keys.mkString(",")}")
+ }
+ if (skippedFiles.nonEmpty) {
+ logWarning(s"Skipped conf file(s) ${skippedFiles.mkString(",")}, due
to size constraint." +
+ s" Please see, config: `${Config.CONFIG_MAP_MAXSIZE.key}` for more
details.")
+ }
+ truncatedMap.toMap
} else {
Map.empty[String, String]
}
}
- private def listConfFiles(confDir: String): Seq[File] = {
- // We exclude all the template files and user provided spark conf or
properties.
- // As spark properties are resolved in a different step.
+ private def listConfFiles(confDir: String, maxSize: Long): Seq[File] = {
+ // At the moment configmaps do not support storing binary content (i.e.
skip jar,tar,gzip,zip),
+ // and configMaps do not allow for size greater than 1.5 MiB(configurable).
+ // https://etcd.io/docs/v3.4.0/dev-guide/limit/
+ def testIfTooLargeOrBinary(f: File): Boolean = (f.length() +
f.getName.length > maxSize) ||
+ f.getName.matches(".*\\.(gz|zip|jar|tar)")
+
+ // We exclude all the template files and user provided spark conf or
properties,
+ // Spark properties are resolved in a different step.
+ def testIfSparkConfOrTemplates(f: File) =
f.getName.matches(".*\\.template") ||
+ f.getName.matches("spark.*(conf|properties)")
+
val fileFilter = (f: File) => {
- f.isFile && !(f.getName.endsWith("template") ||
- f.getName.matches("spark.*(conf|properties)"))
+ f.isFile && !testIfTooLargeOrBinary(f) && !testIfSparkConfOrTemplates(f)
}
val confFiles: Seq[File] = {
val dir = new File(confDir)
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index 1a14d52..18d0c00 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -191,25 +191,32 @@ class ClientSuite extends SparkFunSuite with
BeforeAndAfter {
assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf2key=conf2value"))
}
- test("All files from SPARK_CONF_DIR, except templates and spark config " +
+ test("All files from SPARK_CONF_DIR, " +
+ "except templates, spark config, binary files and are within size limit, "
+
"should be populated to pod's configMap.") {
def testSetup: (SparkConf, Seq[String]) = {
val tempDir = Utils.createTempDir()
- val sparkConf = new SparkConf(loadDefaults =
false).setSparkHome(tempDir.getAbsolutePath)
+ val sparkConf = new SparkConf(loadDefaults = false)
+ .setSparkHome(tempDir.getAbsolutePath)
val tempConfDir = new File(s"${tempDir.getAbsolutePath}/conf")
tempConfDir.mkdir()
// File names - which should not get mounted on the resultant config map.
val filteredConfFileNames =
- Set("spark-env.sh.template", "spark.properties", "spark-defaults.conf")
- val confFileNames = for (i <- 1 to 5) yield s"testConf.$i" ++
+ Set("spark-env.sh.template", "spark.properties", "spark-defaults.conf",
+ "test.gz", "test2.jar", "non_utf8.txt")
+ val confFileNames = (for (i <- 1 to 5) yield s"testConf.$i") ++
List("spark-env.sh") ++ filteredConfFileNames
- val testConfFiles = for (i <- confFileNames) yield {
+ val testConfFiles = (for (i <- confFileNames) yield {
val file = new File(s"${tempConfDir.getAbsolutePath}/$i")
- Files.write(file.toPath,
"conf1key=conf1value".getBytes(StandardCharsets.UTF_8))
+ if (i.startsWith("non_utf8")) { // filling some non-utf-8 binary
+ Files.write(file.toPath, Array[Byte](0x00.toByte, 0xA1.toByte))
+ } else {
+ Files.write(file.toPath,
"conf1key=conf1value".getBytes(StandardCharsets.UTF_8))
+ }
file.getName
- }
+ })
assert(tempConfDir.listFiles().length == confFileNames.length)
val expectedConfFiles: Seq[String] =
testConfFiles.filterNot(filteredConfFileNames.contains)
(sparkConf, expectedConfFiles)
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtilsSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtilsSuite.scala
new file mode 100644
index 0000000..ee672cc
--- /dev/null
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtilsSuite.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.submit
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config
+import org.apache.spark.util.Utils
+
+class KubernetesClientUtilsSuite extends SparkFunSuite with BeforeAndAfter {
+
+ def testSetup(inputFiles: Map[String, Array[Byte]]): SparkConf = {
+ val tempDir = Utils.createTempDir()
+ val sparkConf = new SparkConf(loadDefaults = false)
+ .setSparkHome(tempDir.getAbsolutePath)
+
+ val tempConfDir = new File(s"${tempDir.getAbsolutePath}/conf")
+ tempConfDir.mkdir()
+ for (i <- inputFiles) yield {
+ val file = new File(s"${tempConfDir.getAbsolutePath}/${i._1}")
+ Files.write(file.toPath, i._2)
+ file.getName
+ }
+ sparkConf
+ }
+
+ test("verify load files, loads only allowed files and not the disallowed
files.") {
+ val input: Map[String, Array[Byte]] = Map("test.txt" -> "test123",
"z12.zip" -> "zZ",
+ "rere.jar" -> "@31", "spark.jar" -> "@31", "_test" -> "", "sample.conf"
-> "conf")
+ .map(f => f._1 -> f._2.getBytes(StandardCharsets.UTF_8)) ++
+ Map("binary-file.conf" -> Array[Byte](0x00.toByte, 0xA1.toByte))
+ val sparkConf = testSetup(input)
+ val output = KubernetesClientUtils.loadSparkConfDirFiles(sparkConf)
+ val expectedOutput = Map("test.txt" -> "test123", "sample.conf" -> "conf",
"_test" -> "")
+ assert(output === expectedOutput)
+ }
+
+ test("verify load files, truncates the content to maxSize, when keys are
very large in number.") {
+ val input = (for (i <- 10000 to 1 by -1) yield (s"testConf.${i}" ->
"test123456")).toMap
+ val sparkConf = testSetup(input.map(f => f._1 ->
f._2.getBytes(StandardCharsets.UTF_8)))
+ .set(Config.CONFIG_MAP_MAXSIZE.key, "60")
+ val output = KubernetesClientUtils.loadSparkConfDirFiles(sparkConf)
+ val expectedOutput = Map("testConf.1" -> "test123456", "testConf.2" ->
"test123456")
+ assert(output === expectedOutput)
+ val output1 = KubernetesClientUtils.loadSparkConfDirFiles(
+ sparkConf.set(Config.CONFIG_MAP_MAXSIZE.key, "250000"))
+ assert(output1 === input)
+ }
+
+ test("verify load files, truncates the content to maxSize, when keys are
equal in length.") {
+ val input = (for (i <- 9 to 1 by -1) yield (s"testConf.${i}" ->
"test123456")).toMap
+ val sparkConf = testSetup(input.map(f => f._1 ->
f._2.getBytes(StandardCharsets.UTF_8)))
+ .set(Config.CONFIG_MAP_MAXSIZE.key, "80")
+ val output = KubernetesClientUtils.loadSparkConfDirFiles(sparkConf)
+ val expectedOutput = Map("testConf.1" -> "test123456", "testConf.2" ->
"test123456",
+ "testConf.3" -> "test123456")
+ assert(output === expectedOutput)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]