This is an automated email from the ASF dual-hosted git repository.

prashant pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new f2b80c7  [SPARK-32221][K8S] Avoid possible errors due to incorrect 
file size or type supplied in spark conf
f2b80c7 is described below

commit f2b80c78dfaed25bd444d71847c9483fbe4a4115
Author: Prashant Sharma <[email protected]>
AuthorDate: Wed Jan 6 14:55:40 2021 +0530

    [SPARK-32221][K8S] Avoid possible errors due to incorrect file size or type 
supplied in spark conf
    
    ### What changes were proposed in this pull request?
    
    Skip files if they are binary or very large to fit the configMap's max size.
    
    ### Why are the changes needed?
    
    Config map cannot hold binary files and there is also a limit on how much 
data a configMap can hold.
    This limit can be configured by the k8s cluster admin. This PR, skips such 
files (with a warning) instead of failing with weird runtime errors.
    If such files are not skipped, then it would result in mount errors or 
encoding errors (if binary files are submitted).
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, in simple words avoids possible errors due to negligence (for example, 
placing a large file or a binary file in SPARK_CONF_DIR) and thus improves user 
experience.
    
    ### How was this patch tested?
    
    Added relevant tests and improved existing tests.
    
    Closes #30472 from ScrapCodes/SPARK-32221/avoid-conf-propagate-errors.
    
    Lead-authored-by: Prashant Sharma <[email protected]>
    Co-authored-by: Prashant Sharma <[email protected]>
    Signed-off-by: Prashant Sharma <[email protected]>
    (cherry picked from commit f64dfa8727b785f333a0c10f5f7175ab51f22764)
    Signed-off-by: Prashant Sharma <[email protected]>
---
 .../scala/org/apache/spark/deploy/k8s/Config.scala |  8 +++
 .../deploy/k8s/submit/KubernetesClientUtils.scala  | 80 +++++++++++++++++-----
 .../spark/deploy/k8s/submit/ClientSuite.scala      | 21 ++++--
 .../k8s/submit/KubernetesClientUtilsSuite.scala    | 79 +++++++++++++++++++++
 4 files changed, 164 insertions(+), 24 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 8232ed3..65ac3c9 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -91,6 +91,14 @@ private[spark] object Config extends Logging {
       .toSequence
       .createWithDefault(Nil)
 
+  val CONFIG_MAP_MAXSIZE =
+    ConfigBuilder("spark.kubernetes.configMap.maxSize")
+      .doc("Max size limit for a config map. This is configurable as per" +
+        " https://etcd.io/docs/v3.4.0/dev-guide/limit/ on k8s server end.")
+      .version("3.1.0")
+      .longConf
+      .createWithDefault(1572864) // 1.5 MiB
+
   val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = 
"spark.kubernetes.authenticate.driver"
   val KUBERNETES_AUTH_EXECUTOR_CONF_PREFIX = 
"spark.kubernetes.authenticate.executor"
   val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = 
"spark.kubernetes.authenticate.driver.mounted"
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala
index 32f630f..4207077 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala
@@ -18,15 +18,17 @@
 package org.apache.spark.deploy.k8s.submit
 
 import java.io.{File, StringWriter}
+import java.nio.charset.MalformedInputException
 import java.util.Properties
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.io.{Codec, Source}
 
 import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, KeyToPath}
 
 import org.apache.spark.SparkConf
-import org.apache.spark.deploy.k8s.{Constants, KubernetesUtils}
+import org.apache.spark.deploy.k8s.{Config, Constants, KubernetesUtils}
 import org.apache.spark.deploy.k8s.Constants.ENV_SPARK_CONF_DIR
 import org.apache.spark.internal.Logging
 
@@ -54,8 +56,10 @@ private[spark] object KubernetesClientUtils extends Logging {
   /**
    * Build, file -> 'file's content' map of all the selected files in 
SPARK_CONF_DIR.
    */
-  def buildSparkConfDirFilesMap(configMapName: String,
-      sparkConf: SparkConf, resolvedPropertiesMap: Map[String, String]): 
Map[String, String] = {
+  def buildSparkConfDirFilesMap(
+      configMapName: String,
+      sparkConf: SparkConf,
+      resolvedPropertiesMap: Map[String, String]): Map[String, String] = 
synchronized {
     val loadedConfFilesMap = 
KubernetesClientUtils.loadSparkConfDirFiles(sparkConf)
     // Add resolved spark conf to the loaded configuration files map.
     if (resolvedPropertiesMap.nonEmpty) {
@@ -90,29 +94,71 @@ private[spark] object KubernetesClientUtils extends Logging 
{
       .build()
   }
 
-  private def loadSparkConfDirFiles(conf: SparkConf): Map[String, String] = {
+  private def orderFilesBySize(confFiles: Seq[File]): Seq[File] = {
+    val fileToFileSizePairs = confFiles.map(f => (f, f.getName.length + 
f.length()))
+    // sort first by name and then by length, so that during tests we have 
consistent results.
+    fileToFileSizePairs.sortBy(f => f._1).sortBy(f => f._2).map(_._1)
+  }
+
+  // exposed for testing
+  private[submit] def loadSparkConfDirFiles(conf: SparkConf): Map[String, 
String] = {
     val confDir = Option(conf.getenv(ENV_SPARK_CONF_DIR)).orElse(
       conf.getOption("spark.home").map(dir => s"$dir/conf"))
+    val maxSize = conf.get(Config.CONFIG_MAP_MAXSIZE)
     if (confDir.isDefined) {
-      val confFiles = listConfFiles(confDir.get)
-      logInfo(s"Spark configuration files loaded from $confDir : 
${confFiles.mkString(",")}")
-      confFiles.map { file =>
-        val source = Source.fromFile(file)(Codec.UTF8)
-        val mapping = (file.getName -> source.mkString)
-        source.close()
-        mapping
-      }.toMap
+      val confFiles: Seq[File] = listConfFiles(confDir.get, maxSize)
+      val orderedConfFiles = orderFilesBySize(confFiles)
+      var truncatedMapSize: Long = 0
+      val truncatedMap = mutable.HashMap[String, String]()
+      val skippedFiles = mutable.HashSet[String]()
+      var source: Source = Source.fromString("") // init with empty source.
+      for (file <- orderedConfFiles) {
+        try {
+          source = Source.fromFile(file)(Codec.UTF8)
+          val (fileName, fileContent) = file.getName -> source.mkString
+          if ((truncatedMapSize + fileName.length + fileContent.length) < 
maxSize) {
+            truncatedMap.put(fileName, fileContent)
+            truncatedMapSize = truncatedMapSize + (fileName.length + 
fileContent.length)
+          } else {
+            skippedFiles.add(fileName)
+          }
+        } catch {
+          case e: MalformedInputException =>
+            logWarning(
+              s"Unable to read a non UTF-8 encoded file 
${file.getAbsolutePath}. Skipping...", e)
+            None
+        } finally {
+          source.close()
+        }
+      }
+      if (truncatedMap.nonEmpty) {
+        logInfo(s"Spark configuration files loaded from $confDir :" +
+          s" ${truncatedMap.keys.mkString(",")}")
+      }
+      if (skippedFiles.nonEmpty) {
+        logWarning(s"Skipped conf file(s) ${skippedFiles.mkString(",")}, due 
to size constraint." +
+          s" Please see, config: `${Config.CONFIG_MAP_MAXSIZE.key}` for more 
details.")
+      }
+      truncatedMap.toMap
     } else {
       Map.empty[String, String]
     }
   }
 
-  private def listConfFiles(confDir: String): Seq[File] = {
-    // We exclude all the template files and user provided spark conf or 
properties.
-    // As spark properties are resolved in a different step.
+  private def listConfFiles(confDir: String, maxSize: Long): Seq[File] = {
+    // At the moment configmaps do not support storing binary content (i.e. 
skip jar,tar,gzip,zip),
+    // and configMaps do not allow for size greater than 1.5 MiB(configurable).
+    // https://etcd.io/docs/v3.4.0/dev-guide/limit/
+    def testIfTooLargeOrBinary(f: File): Boolean = (f.length() + 
f.getName.length > maxSize) ||
+      f.getName.matches(".*\\.(gz|zip|jar|tar)")
+
+    // We exclude all the template files and user provided spark conf or 
properties,
+    // Spark properties are resolved in a different step.
+    def testIfSparkConfOrTemplates(f: File) = 
f.getName.matches(".*\\.template") ||
+      f.getName.matches("spark.*(conf|properties)")
+
     val fileFilter = (f: File) => {
-      f.isFile && !(f.getName.endsWith("template") ||
-        f.getName.matches("spark.*(conf|properties)"))
+      f.isFile && !testIfTooLargeOrBinary(f) && !testIfSparkConfOrTemplates(f)
     }
     val confFiles: Seq[File] = {
       val dir = new File(confDir)
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index 1a14d52..18d0c00 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -191,25 +191,32 @@ class ClientSuite extends SparkFunSuite with 
BeforeAndAfter {
     
assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf2key=conf2value"))
   }
 
-  test("All files from SPARK_CONF_DIR, except templates and spark config " +
+  test("All files from SPARK_CONF_DIR, " +
+    "except templates, spark config, binary files and are within size limit, " 
+
     "should be populated to pod's configMap.") {
     def testSetup: (SparkConf, Seq[String]) = {
       val tempDir = Utils.createTempDir()
-      val sparkConf = new SparkConf(loadDefaults = 
false).setSparkHome(tempDir.getAbsolutePath)
+      val sparkConf = new SparkConf(loadDefaults = false)
+        .setSparkHome(tempDir.getAbsolutePath)
 
       val tempConfDir = new File(s"${tempDir.getAbsolutePath}/conf")
       tempConfDir.mkdir()
       // File names - which should not get mounted on the resultant config map.
       val filteredConfFileNames =
-        Set("spark-env.sh.template", "spark.properties", "spark-defaults.conf")
-      val confFileNames = for (i <- 1 to 5) yield s"testConf.$i" ++
+        Set("spark-env.sh.template", "spark.properties", "spark-defaults.conf",
+          "test.gz", "test2.jar", "non_utf8.txt")
+      val confFileNames = (for (i <- 1 to 5) yield s"testConf.$i") ++
         List("spark-env.sh") ++ filteredConfFileNames
 
-      val testConfFiles = for (i <- confFileNames) yield {
+      val testConfFiles = (for (i <- confFileNames) yield {
         val file = new File(s"${tempConfDir.getAbsolutePath}/$i")
-        Files.write(file.toPath, 
"conf1key=conf1value".getBytes(StandardCharsets.UTF_8))
+        if (i.startsWith("non_utf8")) { // filling some non-utf-8 binary
+          Files.write(file.toPath, Array[Byte](0x00.toByte, 0xA1.toByte))
+        } else {
+          Files.write(file.toPath, 
"conf1key=conf1value".getBytes(StandardCharsets.UTF_8))
+        }
         file.getName
-      }
+      })
       assert(tempConfDir.listFiles().length == confFileNames.length)
       val expectedConfFiles: Seq[String] = 
testConfFiles.filterNot(filteredConfFileNames.contains)
       (sparkConf, expectedConfFiles)
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtilsSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtilsSuite.scala
new file mode 100644
index 0000000..ee672cc
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtilsSuite.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.submit
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config
+import org.apache.spark.util.Utils
+
+class KubernetesClientUtilsSuite extends SparkFunSuite with BeforeAndAfter {
+
+  def testSetup(inputFiles: Map[String, Array[Byte]]): SparkConf = {
+    val tempDir = Utils.createTempDir()
+    val sparkConf = new SparkConf(loadDefaults = false)
+      .setSparkHome(tempDir.getAbsolutePath)
+
+    val tempConfDir = new File(s"${tempDir.getAbsolutePath}/conf")
+    tempConfDir.mkdir()
+    for (i <- inputFiles) yield {
+      val file = new File(s"${tempConfDir.getAbsolutePath}/${i._1}")
+      Files.write(file.toPath, i._2)
+      file.getName
+    }
+    sparkConf
+  }
+
+  test("verify load files, loads only allowed files and not the disallowed 
files.") {
+    val input: Map[String, Array[Byte]] = Map("test.txt" -> "test123", 
"z12.zip" -> "zZ",
+      "rere.jar" -> "@31", "spark.jar" -> "@31", "_test" -> "", "sample.conf" 
-> "conf")
+      .map(f => f._1 -> f._2.getBytes(StandardCharsets.UTF_8)) ++
+      Map("binary-file.conf" -> Array[Byte](0x00.toByte, 0xA1.toByte))
+    val sparkConf = testSetup(input)
+    val output = KubernetesClientUtils.loadSparkConfDirFiles(sparkConf)
+    val expectedOutput = Map("test.txt" -> "test123", "sample.conf" -> "conf", 
"_test" -> "")
+    assert(output === expectedOutput)
+  }
+
+  test("verify load files, truncates the content to maxSize, when keys are 
very large in number.") {
+    val input = (for (i <- 10000 to 1 by -1) yield (s"testConf.${i}" -> 
"test123456")).toMap
+    val sparkConf = testSetup(input.map(f => f._1 -> 
f._2.getBytes(StandardCharsets.UTF_8)))
+      .set(Config.CONFIG_MAP_MAXSIZE.key, "60")
+    val output = KubernetesClientUtils.loadSparkConfDirFiles(sparkConf)
+    val expectedOutput = Map("testConf.1" -> "test123456", "testConf.2" -> 
"test123456")
+    assert(output === expectedOutput)
+    val output1 = KubernetesClientUtils.loadSparkConfDirFiles(
+      sparkConf.set(Config.CONFIG_MAP_MAXSIZE.key, "250000"))
+    assert(output1 === input)
+  }
+
+  test("verify load files, truncates the content to maxSize, when keys are 
equal in length.") {
+    val input = (for (i <- 9 to 1 by -1) yield (s"testConf.${i}" -> 
"test123456")).toMap
+    val sparkConf = testSetup(input.map(f => f._1 -> 
f._2.getBytes(StandardCharsets.UTF_8)))
+      .set(Config.CONFIG_MAP_MAXSIZE.key, "80")
+    val output = KubernetesClientUtils.loadSparkConfDirFiles(sparkConf)
+    val expectedOutput = Map("testConf.1" -> "test123456", "testConf.2" -> 
"test123456",
+      "testConf.3" -> "test123456")
+    assert(output === expectedOutput)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to