Repository: spark Updated Branches: refs/heads/branch-2.4 8a2992ed4 -> 67f2cb6e0
[SPARK-25291][K8S] Fixing Flakiness of Executor Pod tests ## What changes were proposed in this pull request? Added fix to flakiness that was present in PySpark tests w.r.t Executors not being tested. Important fix to executorConf which was failing tests when executors *were* tested ## How was this patch tested? Unit and Integration tests Closes #22415 from ifilonenko/SPARK-25291. Authored-by: Ilan Filonenko <[email protected]> Signed-off-by: Yinan Li <[email protected]> (cherry picked from commit 123f0041d534f28e14343aafb4e5cec19dde14ad) Signed-off-by: Yinan Li <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67f2cb6e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67f2cb6e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67f2cb6e Branch: refs/heads/branch-2.4 Commit: 67f2cb6e0e9a0f19776eb6fca068dd4ca8e3fb24 Parents: 8a2992e Author: Ilan Filonenko <[email protected]> Authored: Tue Sep 18 11:43:35 2018 -0700 Committer: Yinan Li <[email protected]> Committed: Tue Sep 18 11:44:06 2018 -0700 ---------------------------------------------------------------------- .../k8s/integrationtest/KubernetesSuite.scala | 35 ++++++++++++++------ .../KubernetesTestComponents.scala | 1 - .../k8s/integrationtest/SecretsTestsSuite.scala | 2 +- 3 files changed, 26 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/67f2cb6e/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 896a83a..ba38f14 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -23,7 +23,10 @@ import java.util.regex.Pattern import com.google.common.io.PatternFilenameFilter import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Tag} +import org.scalatest.Matchers import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Minutes, Seconds, Span} import scala.collection.JavaConverters._ @@ -31,10 +34,12 @@ import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite import org.apache.spark.deploy.k8s.integrationtest.TestConfig._ import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory} +import org.apache.spark.internal.Logging private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite - with PythonTestsSuite with ClientModeTestsSuite { + with PythonTestsSuite with ClientModeTestsSuite + with Logging with Eventually with Matchers { import KubernetesSuite._ @@ -207,17 +212,28 @@ private[spark] class KubernetesSuite extends SparkFunSuite .getItems .get(0) driverPodChecker(driverPod) - - val executorPods = kubernetesTestComponents.kubernetesClient + val execPods = scala.collection.mutable.Map[String, Pod]() + val execWatcher = kubernetesTestComponents.kubernetesClient .pods() .withLabel("spark-app-locator", appLocator) .withLabel("spark-role", "executor") - .list() - .getItems - executorPods.asScala.foreach { pod => - executorPodChecker(pod) - } - + .watch(new Watcher[Pod] { + logInfo("Beginning watch of executors") + override def onClose(cause: KubernetesClientException): Unit = + logInfo("Ending watch of executors") + override def eventReceived(action: Watcher.Action, resource: Pod): Unit = { + val name = resource.getMetadata.getName + action match { + case Action.ADDED | Action.MODIFIED => + execPods(name) = resource + case Action.DELETED | Action.ERROR => + execPods.remove(name) + } + } + }) + Eventually.eventually(TIMEOUT, INTERVAL) { execPods.values.nonEmpty should be (true) } + execWatcher.close() + execPods.values.foreach(executorPodChecker(_)) Eventually.eventually(TIMEOUT, INTERVAL) { expectedLogOnCompletion.foreach { e => assert(kubernetesTestComponents.kubernetesClient @@ -228,7 +244,6 @@ private[spark] class KubernetesSuite extends SparkFunSuite } } } - protected def doBasicDriverPodCheck(driverPod: Pod): Unit = { assert(driverPod.getMetadata.getName === driverPodName) assert(driverPod.getSpec.getContainers.get(0).getImage === image) http://git-wip-us.apache.org/repos/asf/spark/blob/67f2cb6e/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala index b602fdf..5615d61 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala @@ -62,7 +62,6 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl new SparkAppConf() .set("spark.master", s"k8s://${kubernetesClient.getMasterUrl}") .set("spark.kubernetes.namespace", namespace) - .set("spark.executor.memory", "500m") .set("spark.executor.cores", "1") .set("spark.executors.instances", "1") .set("spark.app.name", "spark-test-app") http://git-wip-us.apache.org/repos/asf/spark/blob/67f2cb6e/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala index 7b05c13..b18a6ae 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.integrationtest import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{Pod, Secret, SecretBuilder} +import io.fabric8.kubernetes.api.model.{Pod, SecretBuilder} import org.apache.commons.codec.binary.Base64 import org.apache.commons.io.output.ByteArrayOutputStream import org.scalatest.concurrent.Eventually --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
