Repository: spark Updated Branches: refs/heads/master 182da81e9 -> 123f0041d
[SPARK-25291][K8S] Fixing Flakiness of Executor Pod tests ## What changes were proposed in this pull request? Added fix to flakiness that was present in PySpark tests w.r.t Executors not being tested. Important fix to executorConf which was failing tests when executors *were* tested ## How was this patch tested? Unit and Integration tests Closes #22415 from ifilonenko/SPARK-25291. Authored-by: Ilan Filonenko <[email protected]> Signed-off-by: Yinan Li <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/123f0041 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/123f0041 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/123f0041 Branch: refs/heads/master Commit: 123f0041d534f28e14343aafb4e5cec19dde14ad Parents: 182da81 Author: Ilan Filonenko <[email protected]> Authored: Tue Sep 18 11:43:35 2018 -0700 Committer: Yinan Li <[email protected]> Committed: Tue Sep 18 11:43:35 2018 -0700 ---------------------------------------------------------------------- .../k8s/integrationtest/KubernetesSuite.scala | 35 ++++++++++++++------ .../KubernetesTestComponents.scala | 1 - .../k8s/integrationtest/SecretsTestsSuite.scala | 3 +- 3 files changed, 26 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/123f0041/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 18541ba..c99a907 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -23,7 +23,10 @@ import java.util.regex.Pattern import com.google.common.io.PatternFilenameFilter import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Tag} +import org.scalatest.Matchers import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Minutes, Seconds, Span} import scala.collection.JavaConverters._ @@ -31,10 +34,12 @@ import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite import org.apache.spark.deploy.k8s.integrationtest.TestConfig._ import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory} +import org.apache.spark.internal.Logging private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite - with PythonTestsSuite with ClientModeTestsSuite { + with PythonTestsSuite with ClientModeTestsSuite + with Logging with Eventually with Matchers { import KubernetesSuite._ @@ -223,17 +228,28 @@ private[spark] class KubernetesSuite extends SparkFunSuite .getItems .get(0) driverPodChecker(driverPod) - - val executorPods = kubernetesTestComponents.kubernetesClient + val execPods = scala.collection.mutable.Map[String, Pod]() + val execWatcher = kubernetesTestComponents.kubernetesClient .pods() .withLabel("spark-app-locator", appLocator) .withLabel("spark-role", "executor") - .list() - .getItems - executorPods.asScala.foreach { pod => - executorPodChecker(pod) - } - + .watch(new Watcher[Pod] { + logInfo("Beginning watch of executors") + override def onClose(cause: KubernetesClientException): Unit = + logInfo("Ending watch of executors") + override def eventReceived(action: Watcher.Action, resource: Pod): Unit = { + val name = resource.getMetadata.getName + action match { + case Action.ADDED | Action.MODIFIED => + execPods(name) = resource + case Action.DELETED | Action.ERROR => + execPods.remove(name) + } + } + }) + Eventually.eventually(TIMEOUT, INTERVAL) { execPods.values.nonEmpty should be (true) } + execWatcher.close() + execPods.values.foreach(executorPodChecker(_)) Eventually.eventually(TIMEOUT, INTERVAL) { expectedLogOnCompletion.foreach { e => assert(kubernetesTestComponents.kubernetesClient @@ -244,7 +260,6 @@ private[spark] class KubernetesSuite extends SparkFunSuite } } } - protected def doBasicDriverPodCheck(driverPod: Pod): Unit = { assert(driverPod.getMetadata.getName === driverPodName) assert(driverPod.getSpec.getContainers.get(0).getImage === image) http://git-wip-us.apache.org/repos/asf/spark/blob/123f0041/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala index b602fdf..5615d61 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala @@ -62,7 +62,6 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl new SparkAppConf() .set("spark.master", s"k8s://${kubernetesClient.getMasterUrl}") .set("spark.kubernetes.namespace", namespace) - .set("spark.executor.memory", "500m") .set("spark.executor.cores", "1") .set("spark.executors.instances", "1") .set("spark.app.name", "spark-test-app") http://git-wip-us.apache.org/repos/asf/spark/blob/123f0041/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala index 9b039bb..b18a6ae 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.integrationtest import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{Pod, Secret, SecretBuilder} +import io.fabric8.kubernetes.api.model.{Pod, SecretBuilder} import org.apache.commons.codec.binary.Base64 import org.apache.commons.io.output.ByteArrayOutputStream import org.scalatest.concurrent.Eventually @@ -53,7 +53,6 @@ private[spark] trait SecretsTestsSuite { k8sSuite: KubernetesSuite => .delete() } - // TODO: [SPARK-25291] This test is flaky with regards to memory of executors test("Run SparkPi with env and mount secrets.", k8sTestTag) { createTestSecret() sparkAppConf --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
