This is an automated email from the ASF dual-hosted git repository.
holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2825237 [SPARK-31062][K8S][TESTS] Improve spark decommissioning k8s
test reliability
2825237 is described below
commit 28252374482cd212bc9f4c15b9ee4b875d0e7caf
Author: Holden Karau <[email protected]>
AuthorDate: Wed Mar 11 14:42:31 2020 -0700
[SPARK-31062][K8S][TESTS] Improve spark decommissioning k8s test reliability
### What changes were proposed in this pull request?
Replace a sleep with waiting for the first collect to happen to try and
make the K8s test code more reliable.
### Why are the changes needed?
Currently the Decommissioning test appears to be flaky in Jenkins.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Ran K8s test suite in a loop on minikube on my desktop for 10 iterations
without this test failing on any of the runs.
Closes #27858 from
holdenk/SPARK-31062-Improve-Spark-Decommissioning-K8s-test-teliability.
Authored-by: Holden Karau <[email protected]>
Signed-off-by: Holden Karau <[email protected]>
---
.../k8s/integrationtest/DecommissionSuite.scala | 5 +--
.../k8s/integrationtest/KubernetesSuite.scala | 40 +++++++++-------------
2 files changed, 20 insertions(+), 25 deletions(-)
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
index f5eab6e..becf941 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
@@ -32,8 +32,9 @@ private[spark] trait DecommissionSuite { k8sSuite:
KubernetesSuite =>
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_DECOMISSIONING,
mainClass = "",
- expectedLogOnCompletion = Seq("decommissioning executor",
- "Finished waiting, stopping Spark"),
+ expectedLogOnCompletion = Seq(
+ "Finished waiting, stopping Spark",
+ "decommissioning executor"),
appArgs = Array.empty[String],
driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck,
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 61e1f27..eaaf67d 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -298,6 +298,7 @@ class KubernetesSuite extends SparkFunSuite
.headOption.getOrElse(false)
result
}
+
val execWatcher = kubernetesTestComponents.kubernetesClient
.pods()
.withLabel("spark-app-locator", appLocator)
@@ -316,17 +317,25 @@ class KubernetesSuite extends SparkFunSuite
logDebug(s"Add event received for $name.")
execPods(name) = resource
// If testing decommissioning start a thread to simulate
- // decommissioning.
+ // decommissioning on the first exec pod.
if (decommissioningTest && execPods.size == 1) {
// Wait for all the containers in the pod to be running
- logDebug("Waiting for first pod to become OK prior to
deletion")
+ logDebug("Waiting for pod to become OK prior to deletion")
Eventually.eventually(patienceTimeout, patienceInterval) {
val result = checkPodReady(namespace, name)
result shouldBe (true)
}
- // Sleep a small interval to allow execution of job
- logDebug("Sleeping before killing pod.")
- Thread.sleep(2000)
+ // Look for the string that indicates we're good to clean up
+ // on the driver
+ logDebug("Waiting for first collect...")
+ Eventually.eventually(TIMEOUT, INTERVAL) {
+ assert(kubernetesTestComponents.kubernetesClient
+ .pods()
+ .withName(driverPodName)
+ .getLog
+ .contains("Waiting to give nodes time to finish."),
+ "Decommission test did not complete first collect.")
+ }
// Delete the pod to simulate cluster scale down/migration.
val pod =
kubernetesTestComponents.kubernetesClient.pods().withName(name)
pod.delete()
@@ -361,23 +370,6 @@ class KubernetesSuite extends SparkFunSuite
Eventually.eventually(patienceTimeout, patienceInterval) {
execPods.values.nonEmpty should be (true)
}
- // If decommissioning we need to wait and check the executors were removed
- if (decommissioningTest) {
- // Sleep a small interval to ensure everything is registered.
- Thread.sleep(100)
- // Wait for the executors to become ready
- Eventually.eventually(patienceTimeout, patienceInterval) {
- val anyReadyPods = ! execPods.map{
- case (name, resource) =>
- (name, resource.getMetadata().getNamespace())
- }.filter{
- case (name, namespace) => checkPodReady(namespace, name)
- }.isEmpty
- val podsEmpty = execPods.values.isEmpty
- val podsReadyOrDead = anyReadyPods || podsEmpty
- podsReadyOrDead shouldBe (true)
- }
- }
execWatcher.close()
execPods.values.foreach(executorPodChecker(_))
Eventually.eventually(patienceTimeout, patienceInterval) {
@@ -386,10 +378,12 @@ class KubernetesSuite extends SparkFunSuite
.pods()
.withName(driverPod.getMetadata.getName)
.getLog
- .contains(e), "The application did not complete.")
+ .contains(e),
+ s"The application did not complete, did not find str ${e}")
}
}
}
+
protected def doBasicDriverPodCheck(driverPod: Pod): Unit = {
assert(driverPod.getMetadata.getName === driverPodName)
assert(driverPod.getSpec.getContainers.get(0).getImage === image)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]