Repository: spark
Updated Branches:
  refs/heads/master 18cb0c079 -> 270a9a3ca


http://git-wip-us.apache.org/repos/asf/spark/blob/270a9a3c/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
new file mode 100644
index 0000000..0c19f59
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.PodResource
+import org.mockito.{ArgumentMatcher, Matchers, Mock, MockitoAnnotations}
+import org.mockito.Matchers.any
+import org.mockito.Mockito.{never, times, verify, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesExecutorSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
+import org.apache.spark.util.ManualClock
+
+class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
+
+  private val driverPodName = "driver"
+
+  private val driverPod = new PodBuilder()
+    .withNewMetadata()
+      .withName(driverPodName)
+      .addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)
+      .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
+      .withUid("driver-pod-uid")
+      .endMetadata()
+    .build()
+
+  private val conf = new SparkConf().set(KUBERNETES_DRIVER_POD_NAME, 
driverPodName)
+
+  private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+  private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+  private val podCreationTimeout = math.max(podAllocationDelay * 5, 60000L)
+
+  private var waitForExecutorPodsClock: ManualClock = _
+
+  @Mock
+  private var kubernetesClient: KubernetesClient = _
+
+  @Mock
+  private var podOperations: PODS = _
+
+  @Mock
+  private var labeledPods: LABELED_PODS = _
+
+  @Mock
+  private var driverPodOperations: PodResource[Pod, DoneablePod] = _
+
+  @Mock
+  private var executorBuilder: KubernetesExecutorBuilder = _
+
+  private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _
+
+  private var podsAllocatorUnderTest: ExecutorPodsAllocator = _
+
+  before {
+    MockitoAnnotations.initMocks(this)
+    when(kubernetesClient.pods()).thenReturn(podOperations)
+    when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
+    when(driverPodOperations.get).thenReturn(driverPod)
+    when(executorBuilder.buildFromFeatures(kubernetesConfWithCorrectFields()))
+      .thenAnswer(executorPodAnswer())
+    snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
+    waitForExecutorPodsClock = new ManualClock(0L)
+    podsAllocatorUnderTest = new ExecutorPodsAllocator(
+      conf, executorBuilder, kubernetesClient, snapshotsStore, 
waitForExecutorPodsClock)
+    podsAllocatorUnderTest.start(TEST_SPARK_APP_ID)
+  }
+
+  test("Initially request executors in batches. Do not request another batch 
if the" +
+    " first has not finished.") {
+    podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize + 1)
+    snapshotsStore.replaceSnapshot(Seq.empty[Pod])
+    snapshotsStore.notifySubscribers()
+    for (nextId <- 1 to podAllocationSize) {
+      verify(podOperations).create(podWithAttachedContainerForId(nextId))
+    }
+    verify(podOperations, 
never()).create(podWithAttachedContainerForId(podAllocationSize + 1))
+  }
+
+  test("Request executors in batches. Allow another batch to be requested if" +
+    " all pending executors start running.") {
+    podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize + 1)
+    snapshotsStore.replaceSnapshot(Seq.empty[Pod])
+    snapshotsStore.notifySubscribers()
+    for (execId <- 1 until podAllocationSize) {
+      snapshotsStore.updatePod(runningExecutor(execId))
+    }
+    snapshotsStore.notifySubscribers()
+    verify(podOperations, 
never()).create(podWithAttachedContainerForId(podAllocationSize + 1))
+    snapshotsStore.updatePod(runningExecutor(podAllocationSize))
+    snapshotsStore.notifySubscribers()
+    
verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 
1))
+    snapshotsStore.updatePod(runningExecutor(podAllocationSize))
+    snapshotsStore.notifySubscribers()
+    verify(podOperations, times(podAllocationSize + 
1)).create(any(classOf[Pod]))
+  }
+
+  test("When a current batch reaches error states immediately, re-request" +
+    " them on the next batch.") {
+    podsAllocatorUnderTest.setTotalExpectedExecutors(podAllocationSize)
+    snapshotsStore.replaceSnapshot(Seq.empty[Pod])
+    snapshotsStore.notifySubscribers()
+    for (execId <- 1 until podAllocationSize) {
+      snapshotsStore.updatePod(runningExecutor(execId))
+    }
+    val failedPod = failedExecutorWithoutDeletion(podAllocationSize)
+    snapshotsStore.updatePod(failedPod)
+    snapshotsStore.notifySubscribers()
+    
verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 
1))
+  }
+
+  test("When an executor is requested but the API does not report it in a 
reasonable time, retry" +
+    " requesting that executor.") {
+    podsAllocatorUnderTest.setTotalExpectedExecutors(1)
+    snapshotsStore.replaceSnapshot(Seq.empty[Pod])
+    snapshotsStore.notifySubscribers()
+    snapshotsStore.replaceSnapshot(Seq.empty[Pod])
+    waitForExecutorPodsClock.setTime(podCreationTimeout + 1)
+    when(podOperations.withLabel(SPARK_EXECUTOR_ID_LABEL, 
"1")).thenReturn(labeledPods)
+    snapshotsStore.notifySubscribers()
+    verify(labeledPods).delete()
+    verify(podOperations).create(podWithAttachedContainerForId(2))
+  }
+
+  private def executorPodAnswer(): Answer[SparkPod] = {
+    new Answer[SparkPod] {
+      override def answer(invocation: InvocationOnMock): SparkPod = {
+        val k8sConf = invocation.getArgumentAt(
+          0, classOf[KubernetesConf[KubernetesExecutorSpecificConf]])
+        executorPodWithId(k8sConf.roleSpecificConf.executorId.toInt)
+      }
+    }
+  }
+
+  private def kubernetesConfWithCorrectFields(): 
KubernetesConf[KubernetesExecutorSpecificConf] =
+    Matchers.argThat(new 
ArgumentMatcher[KubernetesConf[KubernetesExecutorSpecificConf]] {
+      override def matches(argument: scala.Any): Boolean = {
+        if 
(!argument.isInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]) {
+          false
+        } else {
+          val k8sConf = 
argument.asInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]
+          val executorSpecificConf = k8sConf.roleSpecificConf
+          val expectedK8sConf = KubernetesConf.createExecutorConf(
+            conf,
+            executorSpecificConf.executorId,
+            TEST_SPARK_APP_ID,
+            driverPod)
+          k8sConf.sparkConf.getAll.toMap == conf.getAll.toMap &&
+            // Since KubernetesConf.createExecutorConf clones the SparkConf 
object, force
+            // deep equality comparison for the SparkConf object and use 
object equality
+            // comparison on all other fields.
+            k8sConf.copy(sparkConf = conf) == expectedK8sConf.copy(sparkConf = 
conf)
+        }
+      }
+    })
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/270a9a3c/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
new file mode 100644
index 0000000..562ace9
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import com.google.common.cache.CacheBuilder
+import io.fabric8.kubernetes.api.model.{DoneablePod, Pod}
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.PodResource
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Matchers.any
+import org.mockito.Mockito.{mock, times, verify, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.BeforeAndAfter
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
+
+class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with 
BeforeAndAfter {
+
+  private var namedExecutorPods: mutable.Map[String, PodResource[Pod, 
DoneablePod]] = _
+
+  @Mock
+  private var kubernetesClient: KubernetesClient = _
+
+  @Mock
+  private var podOperations: PODS = _
+
+  @Mock
+  private var executorBuilder: KubernetesExecutorBuilder = _
+
+  @Mock
+  private var schedulerBackend: KubernetesClusterSchedulerBackend = _
+
+  private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _
+  private var eventHandlerUnderTest: ExecutorPodsLifecycleManager = _
+
+  before {
+    MockitoAnnotations.initMocks(this)
+    val removedExecutorsCache = 
CacheBuilder.newBuilder().build[java.lang.Long, java.lang.Long]
+    snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
+    namedExecutorPods = mutable.Map.empty[String, PodResource[Pod, 
DoneablePod]]
+    when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty[String])
+    when(kubernetesClient.pods()).thenReturn(podOperations)
+    
when(podOperations.withName(any(classOf[String]))).thenAnswer(namedPodsAnswer())
+    eventHandlerUnderTest = new ExecutorPodsLifecycleManager(
+      new SparkConf(),
+      executorBuilder,
+      kubernetesClient,
+      snapshotsStore,
+      removedExecutorsCache)
+    eventHandlerUnderTest.start(schedulerBackend)
+  }
+
+  test("When an executor reaches error states immediately, remove from the 
scheduler backend.") {
+    val failedPod = failedExecutorWithoutDeletion(1)
+    snapshotsStore.updatePod(failedPod)
+    snapshotsStore.notifySubscribers()
+    val msg = exitReasonMessage(1, failedPod)
+    val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
+    verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
+    verify(namedExecutorPods(failedPod.getMetadata.getName)).delete()
+  }
+
+  test("Don't remove executors twice from Spark but remove from K8s 
repeatedly.") {
+    val failedPod = failedExecutorWithoutDeletion(1)
+    snapshotsStore.updatePod(failedPod)
+    snapshotsStore.updatePod(failedPod)
+    snapshotsStore.notifySubscribers()
+    val msg = exitReasonMessage(1, failedPod)
+    val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
+    verify(schedulerBackend, times(1)).doRemoveExecutor("1", 
expectedLossReason)
+    verify(namedExecutorPods(failedPod.getMetadata.getName), times(2)).delete()
+  }
+
+  test("When the scheduler backend lists executor ids that aren't present in 
the cluster," +
+    " remove those executors from Spark.") {
+    when(schedulerBackend.getExecutorIds()).thenReturn(Seq("1"))
+    val msg = s"The executor with ID 1 was not found in the cluster but we 
didn't" +
+      s" get a reason why. Marking the executor as failed. The executor may 
have been" +
+      s" deleted but the driver missed the deletion event."
+    val expectedLossReason = ExecutorExited(-1, exitCausedByApp = false, msg)
+    snapshotsStore.replaceSnapshot(Seq.empty[Pod])
+    snapshotsStore.notifySubscribers()
+    verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
+  }
+
+  private def exitReasonMessage(failedExecutorId: Int, failedPod: Pod): String 
= {
+    s"""
+       |The executor with id $failedExecutorId exited with exit code 1.
+       |The API gave the following brief reason: 
${failedPod.getStatus.getReason}
+       |The API gave the following message: ${failedPod.getStatus.getMessage}
+       |The API gave the following container statuses:
+       |
+       
|${failedPod.getStatus.getContainerStatuses.asScala.map(_.toString).mkString("\n===\n")}
+      """.stripMargin
+  }
+
+  private def namedPodsAnswer(): Answer[PodResource[Pod, DoneablePod]] = {
+    new Answer[PodResource[Pod, DoneablePod]] {
+      override def answer(invocation: InvocationOnMock): PodResource[Pod, 
DoneablePod] = {
+        val podName = invocation.getArgumentAt(0, classOf[String])
+        namedExecutorPods.getOrElseUpdate(
+          podName, mock(classOf[PodResource[Pod, DoneablePod]]))
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/270a9a3c/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala
new file mode 100644
index 0000000..1b26d6a
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.TimeUnit
+
+import io.fabric8.kubernetes.api.model.PodListBuilder
+import io.fabric8.kubernetes.client.KubernetesClient
+import org.jmock.lib.concurrent.DeterministicScheduler
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Mockito.{verify, when}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
+
+class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with 
BeforeAndAfter {
+
+  private val sparkConf = new SparkConf
+
+  private val pollingInterval = 
sparkConf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
+
+  @Mock
+  private var kubernetesClient: KubernetesClient = _
+
+  @Mock
+  private var podOperations: PODS = _
+
+  @Mock
+  private var appIdLabeledPods: LABELED_PODS = _
+
+  @Mock
+  private var executorRoleLabeledPods: LABELED_PODS = _
+
+  @Mock
+  private var eventQueue: ExecutorPodsSnapshotsStore = _
+
+  private var pollingExecutor: DeterministicScheduler = _
+  private var pollingSourceUnderTest: ExecutorPodsPollingSnapshotSource = _
+
+  before {
+    MockitoAnnotations.initMocks(this)
+    pollingExecutor = new DeterministicScheduler()
+    pollingSourceUnderTest = new ExecutorPodsPollingSnapshotSource(
+      sparkConf,
+      kubernetesClient,
+      eventQueue,
+      pollingExecutor)
+    pollingSourceUnderTest.start(TEST_SPARK_APP_ID)
+    when(kubernetesClient.pods()).thenReturn(podOperations)
+    when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
+      .thenReturn(appIdLabeledPods)
+    when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
+      .thenReturn(executorRoleLabeledPods)
+  }
+
+  test("Items returned by the API should be pushed to the event queue") {
+    when(executorRoleLabeledPods.list())
+      .thenReturn(new PodListBuilder()
+        .addToItems(
+          runningExecutor(1),
+          runningExecutor(2))
+        .build())
+    pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
+    verify(eventQueue).replaceSnapshot(Seq(runningExecutor(1), 
runningExecutor(2)))
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/270a9a3c/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala
new file mode 100644
index 0000000..70e19c9
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
+
+class ExecutorPodsSnapshotSuite extends SparkFunSuite {
+
+  test("States are interpreted correctly from pod metadata.") {
+    val pods = Seq(
+      pendingExecutor(0),
+      runningExecutor(1),
+      succeededExecutor(2),
+      failedExecutorWithoutDeletion(3),
+      deletedExecutor(4),
+      unknownExecutor(5))
+    val snapshot = ExecutorPodsSnapshot(pods)
+    assert(snapshot.executorPods ===
+      Map(
+        0L -> PodPending(pods(0)),
+        1L -> PodRunning(pods(1)),
+        2L -> PodSucceeded(pods(2)),
+        3L -> PodFailed(pods(3)),
+        4L -> PodDeleted(pods(4)),
+        5L -> PodUnknown(pods(5))))
+  }
+
+  test("Updates add new pods for non-matching ids and edit existing pods for 
matching ids") {
+    val originalPods = Seq(
+      pendingExecutor(0),
+      runningExecutor(1))
+    val originalSnapshot = ExecutorPodsSnapshot(originalPods)
+    val snapshotWithUpdatedPod = 
originalSnapshot.withUpdate(succeededExecutor(1))
+    assert(snapshotWithUpdatedPod.executorPods ===
+      Map(
+        0L -> PodPending(originalPods(0)),
+        1L -> PodSucceeded(succeededExecutor(1))))
+    val snapshotWithNewPod = 
snapshotWithUpdatedPod.withUpdate(pendingExecutor(2))
+    assert(snapshotWithNewPod.executorPods ===
+      Map(
+        0L -> PodPending(originalPods(0)),
+        1L -> PodSucceeded(succeededExecutor(1)),
+        2L -> PodPending(pendingExecutor(2))))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/270a9a3c/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreSuite.scala
new file mode 100644
index 0000000..cf54b3c
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreSuite.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicReference
+
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import org.jmock.lib.concurrent.DeterministicScheduler
+import org.scalatest.BeforeAndAfter
+import scala.collection.mutable
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s.Constants._
+
+class ExecutorPodsSnapshotsStoreSuite extends SparkFunSuite with 
BeforeAndAfter {
+
+  private var eventBufferScheduler: DeterministicScheduler = _
+  private var eventQueueUnderTest: ExecutorPodsSnapshotsStoreImpl = _
+
+  before {
+    eventBufferScheduler = new DeterministicScheduler()
+    eventQueueUnderTest = new 
ExecutorPodsSnapshotsStoreImpl(eventBufferScheduler)
+  }
+
+  test("Subscribers get notified of events periodically.") {
+    val receivedSnapshots1 = mutable.Buffer.empty[ExecutorPodsSnapshot]
+    val receivedSnapshots2 = mutable.Buffer.empty[ExecutorPodsSnapshot]
+    eventQueueUnderTest.addSubscriber(1000) {
+      receivedSnapshots1 ++= _
+    }
+    eventQueueUnderTest.addSubscriber(2000) {
+      receivedSnapshots2 ++= _
+    }
+
+    eventBufferScheduler.runUntilIdle()
+    assert(receivedSnapshots1 === Seq(ExecutorPodsSnapshot()))
+    assert(receivedSnapshots2 === Seq(ExecutorPodsSnapshot()))
+
+    pushPodWithIndex(1)
+    // Force time to move forward so that the buffer is emitted, scheduling the
+    // processing task on the subscription executor...
+    eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
+    // ... then actually execute the subscribers.
+
+    assert(receivedSnapshots1 === Seq(
+      ExecutorPodsSnapshot(),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+    assert(receivedSnapshots2 === Seq(ExecutorPodsSnapshot()))
+
+    eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
+
+    // Don't repeat snapshots
+    assert(receivedSnapshots1 === Seq(
+      ExecutorPodsSnapshot(),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+    assert(receivedSnapshots2 === Seq(
+      ExecutorPodsSnapshot(),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+    pushPodWithIndex(2)
+    pushPodWithIndex(3)
+    eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
+
+    assert(receivedSnapshots1 === Seq(
+      ExecutorPodsSnapshot(),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1))),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2))),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2), 
podWithIndex(3)))))
+    assert(receivedSnapshots2 === Seq(
+      ExecutorPodsSnapshot(),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+
+    eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
+    assert(receivedSnapshots1 === Seq(
+      ExecutorPodsSnapshot(),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1))),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2))),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2), 
podWithIndex(3)))))
+    assert(receivedSnapshots1 === receivedSnapshots2)
+  }
+
+  test("Even without sending events, initially receive an empty buffer.") {
+    val receivedInitialSnapshot = new 
AtomicReference[Seq[ExecutorPodsSnapshot]](null)
+    eventQueueUnderTest.addSubscriber(1000) {
+      receivedInitialSnapshot.set
+    }
+    assert(receivedInitialSnapshot.get == null)
+    eventBufferScheduler.runUntilIdle()
+    assert(receivedInitialSnapshot.get === Seq(ExecutorPodsSnapshot()))
+  }
+
+  test("Replacing the snapshot passes the new snapshot to subscribers.") {
+    val receivedSnapshots = mutable.Buffer.empty[ExecutorPodsSnapshot]
+    eventQueueUnderTest.addSubscriber(1000) {
+      receivedSnapshots ++= _
+    }
+    eventQueueUnderTest.updatePod(podWithIndex(1))
+    eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
+    assert(receivedSnapshots === Seq(
+      ExecutorPodsSnapshot(),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+    eventQueueUnderTest.replaceSnapshot(Seq(podWithIndex(2)))
+    eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
+    assert(receivedSnapshots === Seq(
+      ExecutorPodsSnapshot(),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1))),
+      ExecutorPodsSnapshot(Seq(podWithIndex(2)))))
+  }
+
+  private def pushPodWithIndex(index: Int): Unit =
+    eventQueueUnderTest.updatePod(podWithIndex(index))
+
+  private def podWithIndex(index: Int): Pod =
+    new PodBuilder()
+      .editOrNewMetadata()
+        .withName(s"pod-$index")
+        .addToLabels(SPARK_EXECUTOR_ID_LABEL, index.toString)
+        .endMetadata()
+      .editOrNewStatus()
+        .withPhase("running")
+        .endStatus()
+      .build()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/270a9a3c/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala
new file mode 100644
index 0000000..ac1968b
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
+import org.mockito.Mockito.{verify, when}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
+
+class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with 
BeforeAndAfter {
+
+  @Mock
+  private var eventQueue: ExecutorPodsSnapshotsStore = _
+
+  @Mock
+  private var kubernetesClient: KubernetesClient = _
+
+  @Mock
+  private var podOperations: PODS = _
+
+  @Mock
+  private var appIdLabeledPods: LABELED_PODS = _
+
+  @Mock
+  private var executorRoleLabeledPods: LABELED_PODS = _
+
+  @Mock
+  private var watchConnection: Watch = _
+
+  private var watch: ArgumentCaptor[Watcher[Pod]] = _
+
+  private var watchSourceUnderTest: ExecutorPodsWatchSnapshotSource = _
+
+  before {
+    MockitoAnnotations.initMocks(this)
+    watch = ArgumentCaptor.forClass(classOf[Watcher[Pod]])
+    when(kubernetesClient.pods()).thenReturn(podOperations)
+    when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
+      .thenReturn(appIdLabeledPods)
+    when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
+      .thenReturn(executorRoleLabeledPods)
+    
when(executorRoleLabeledPods.watch(watch.capture())).thenReturn(watchConnection)
+    watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
+      eventQueue, kubernetesClient)
+    watchSourceUnderTest.start(TEST_SPARK_APP_ID)
+  }
+
+  test("Watch events should be pushed to the snapshots store as snapshot 
updates.") {
+    watch.getValue.eventReceived(Action.ADDED, runningExecutor(1))
+    watch.getValue.eventReceived(Action.MODIFIED, runningExecutor(2))
+    verify(eventQueue).updatePod(runningExecutor(1))
+    verify(eventQueue).updatePod(runningExecutor(2))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/270a9a3c/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index 96065e8..52e7a12 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -16,85 +16,36 @@
  */
 package org.apache.spark.scheduler.cluster.k8s
 
-import java.util.concurrent.{ExecutorService, ScheduledExecutorService, 
TimeUnit}
-
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, DoneablePod, Pod, 
PodBuilder, PodList}
-import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
-import io.fabric8.kubernetes.client.Watcher.Action
-import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, 
MixedOperation, NonNamespaceOperation, PodResource}
-import org.hamcrest.{BaseMatcher, Description, Matcher}
-import org.mockito.{AdditionalAnswers, ArgumentCaptor, Matchers, Mock, 
MockitoAnnotations}
-import org.mockito.Matchers.{any, eq => mockitoEq}
-import org.mockito.Mockito.{doNothing, never, times, verify, when}
+import io.fabric8.kubernetes.client.KubernetesClient
+import org.jmock.lib.concurrent.DeterministicScheduler
+import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
+import org.mockito.Matchers.{eq => mockitoEq}
+import org.mockito.Mockito.{never, verify, when}
 import org.scalatest.BeforeAndAfter
-import org.scalatest.mockito.MockitoSugar._
-import scala.collection.JavaConverters._
-import scala.concurrent.Future
 
 import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
-import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesExecutorSpecificConf, SparkPod}
-import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.rpc._
-import org.apache.spark.scheduler.{ExecutorExited, LiveListenerBus, SlaveLost, 
TaskSchedulerImpl}
-import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor,
 RemoveExecutor}
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorKilled, TaskSchedulerImpl}
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.ThreadUtils
+import 
org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID
 
 class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with 
BeforeAndAfter {
 
-  private val APP_ID = "test-spark-app"
-  private val DRIVER_POD_NAME = "spark-driver-pod"
-  private val NAMESPACE = "test-namespace"
-  private val SPARK_DRIVER_HOST = "localhost"
-  private val SPARK_DRIVER_PORT = 7077
-  private val POD_ALLOCATION_INTERVAL = "1m"
-  private val FIRST_EXECUTOR_POD = new PodBuilder()
-    .withNewMetadata()
-      .withName("pod1")
-      .endMetadata()
-    .withNewSpec()
-      .withNodeName("node1")
-      .endSpec()
-    .withNewStatus()
-      .withHostIP("192.168.99.100")
-      .endStatus()
-    .build()
-  private val SECOND_EXECUTOR_POD = new PodBuilder()
-    .withNewMetadata()
-      .withName("pod2")
-      .endMetadata()
-    .withNewSpec()
-      .withNodeName("node2")
-      .endSpec()
-    .withNewStatus()
-      .withHostIP("192.168.99.101")
-      .endStatus()
-    .build()
-
-  private type PODS = MixedOperation[Pod, PodList, DoneablePod, 
PodResource[Pod, DoneablePod]]
-  private type LABELED_PODS = FilterWatchListDeletable[
-    Pod, PodList, java.lang.Boolean, Watch, Watcher[Pod]]
-  private type IN_NAMESPACE_PODS = NonNamespaceOperation[
-    Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
-
-  @Mock
-  private var sparkContext: SparkContext = _
-
-  @Mock
-  private var listenerBus: LiveListenerBus = _
-
-  @Mock
-  private var taskSchedulerImpl: TaskSchedulerImpl = _
+  private val requestExecutorsService = new DeterministicScheduler()
+  private val sparkConf = new SparkConf(false)
+    .set("spark.executor.instances", "3")
 
   @Mock
-  private var allocatorExecutor: ScheduledExecutorService = _
+  private var sc: SparkContext = _
 
   @Mock
-  private var requestExecutorsService: ExecutorService = _
+  private var rpcEnv: RpcEnv = _
 
   @Mock
-  private var executorBuilder: KubernetesExecutorBuilder = _
+  private var driverEndpointRef: RpcEndpointRef = _
 
   @Mock
   private var kubernetesClient: KubernetesClient = _
@@ -103,347 +54,97 @@ class KubernetesClusterSchedulerBackendSuite extends 
SparkFunSuite with BeforeAn
   private var podOperations: PODS = _
 
   @Mock
-  private var podsWithLabelOperations: LABELED_PODS = _
+  private var labeledPods: LABELED_PODS = _
 
   @Mock
-  private var podsInNamespace: IN_NAMESPACE_PODS = _
+  private var taskScheduler: TaskSchedulerImpl = _
 
   @Mock
-  private var podsWithDriverName: PodResource[Pod, DoneablePod] = _
+  private var eventQueue: ExecutorPodsSnapshotsStore = _
 
   @Mock
-  private var rpcEnv: RpcEnv = _
+  private var podAllocator: ExecutorPodsAllocator = _
 
   @Mock
-  private var driverEndpointRef: RpcEndpointRef = _
+  private var lifecycleEventHandler: ExecutorPodsLifecycleManager = _
 
   @Mock
-  private var executorPodsWatch: Watch = _
+  private var watchEvents: ExecutorPodsWatchSnapshotSource = _
 
   @Mock
-  private var successFuture: Future[Boolean] = _
+  private var pollEvents: ExecutorPodsPollingSnapshotSource = _
 
-  private var sparkConf: SparkConf = _
-  private var executorPodsWatcherArgument: ArgumentCaptor[Watcher[Pod]] = _
-  private var allocatorRunnable: ArgumentCaptor[Runnable] = _
-  private var requestExecutorRunnable: ArgumentCaptor[Runnable] = _
   private var driverEndpoint: ArgumentCaptor[RpcEndpoint] = _
-
-  private val driverPod = new PodBuilder()
-    .withNewMetadata()
-      .withName(DRIVER_POD_NAME)
-      .addToLabels(SPARK_APP_ID_LABEL, APP_ID)
-      .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
-      .endMetadata()
-    .build()
+  private var schedulerBackendUnderTest: KubernetesClusterSchedulerBackend = _
 
   before {
     MockitoAnnotations.initMocks(this)
-    sparkConf = new SparkConf()
-      .set(KUBERNETES_DRIVER_POD_NAME, DRIVER_POD_NAME)
-      .set(KUBERNETES_NAMESPACE, NAMESPACE)
-      .set("spark.driver.host", SPARK_DRIVER_HOST)
-      .set("spark.driver.port", SPARK_DRIVER_PORT.toString)
-      .set(KUBERNETES_ALLOCATION_BATCH_DELAY.key, POD_ALLOCATION_INTERVAL)
-    executorPodsWatcherArgument = 
ArgumentCaptor.forClass(classOf[Watcher[Pod]])
-    allocatorRunnable = ArgumentCaptor.forClass(classOf[Runnable])
-    requestExecutorRunnable = ArgumentCaptor.forClass(classOf[Runnable])
+    when(taskScheduler.sc).thenReturn(sc)
+    when(sc.conf).thenReturn(sparkConf)
     driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint])
-    when(sparkContext.conf).thenReturn(sparkConf)
-    when(sparkContext.listenerBus).thenReturn(listenerBus)
-    when(taskSchedulerImpl.sc).thenReturn(sparkContext)
-    when(kubernetesClient.pods()).thenReturn(podOperations)
-    when(podOperations.withLabel(SPARK_APP_ID_LABEL, 
APP_ID)).thenReturn(podsWithLabelOperations)
-    when(podsWithLabelOperations.watch(executorPodsWatcherArgument.capture()))
-      .thenReturn(executorPodsWatch)
-    when(podOperations.inNamespace(NAMESPACE)).thenReturn(podsInNamespace)
-    
when(podsInNamespace.withName(DRIVER_POD_NAME)).thenReturn(podsWithDriverName)
-    when(podsWithDriverName.get()).thenReturn(driverPod)
-    when(allocatorExecutor.scheduleWithFixedDelay(
-      allocatorRunnable.capture(),
-      mockitoEq(0L),
-      mockitoEq(TimeUnit.MINUTES.toMillis(1)),
-      mockitoEq(TimeUnit.MILLISECONDS))).thenReturn(null)
-    // Creating Futures in Scala backed by a Java executor service resolves to 
running
-    // ExecutorService#execute (as opposed to submit)
-    
doNothing().when(requestExecutorsService).execute(requestExecutorRunnable.capture())
     when(rpcEnv.setupEndpoint(
       mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), 
driverEndpoint.capture()))
       .thenReturn(driverEndpointRef)
-
-    // Used by the CoarseGrainedSchedulerBackend when making RPC calls.
-    when(driverEndpointRef.ask[Boolean]
-      (any(classOf[Any]))
-      (any())).thenReturn(successFuture)
-    when(successFuture.failed).thenReturn(Future[Throwable] {
-      // emulate behavior of the Future.failed method.
-      throw new NoSuchElementException()
-    }(ThreadUtils.sameThread))
-  }
-
-  test("Basic lifecycle expectations when starting and stopping the 
scheduler.") {
-    val scheduler = newSchedulerBackend()
-    scheduler.start()
-    assert(executorPodsWatcherArgument.getValue != null)
-    assert(allocatorRunnable.getValue != null)
-    scheduler.stop()
-    verify(executorPodsWatch).close()
-  }
-
-  test("Static allocation should request executors upon first allocator run.") 
{
-    sparkConf
-      .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 2)
-      .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2)
-    val scheduler = newSchedulerBackend()
-    scheduler.start()
-    requestExecutorRunnable.getValue.run()
-    val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
-    val secondResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
-    
when(podOperations.create(any(classOf[Pod]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
-    allocatorRunnable.getValue.run()
-    verify(podOperations).create(firstResolvedPod)
-    verify(podOperations).create(secondResolvedPod)
-  }
-
-  test("Killing executors deletes the executor pods") {
-    sparkConf
-      .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 2)
-      .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2)
-    val scheduler = newSchedulerBackend()
-    scheduler.start()
-    requestExecutorRunnable.getValue.run()
-    val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
-    val secondResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
-    when(podOperations.create(any(classOf[Pod])))
-      .thenAnswer(AdditionalAnswers.returnsFirstArg())
-    allocatorRunnable.getValue.run()
-    scheduler.doKillExecutors(Seq("2"))
-    requestExecutorRunnable.getAllValues.asScala.last.run()
-    verify(podOperations).delete(secondResolvedPod)
-    verify(podOperations, never()).delete(firstResolvedPod)
-  }
-
-  test("Executors should be requested in batches.") {
-    sparkConf
-      .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
-      .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2)
-    val scheduler = newSchedulerBackend()
-    scheduler.start()
-    requestExecutorRunnable.getValue.run()
-    when(podOperations.create(any(classOf[Pod])))
-      .thenAnswer(AdditionalAnswers.returnsFirstArg())
-    val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
-    val secondResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
-    allocatorRunnable.getValue.run()
-    verify(podOperations).create(firstResolvedPod)
-    verify(podOperations, never()).create(secondResolvedPod)
-    val registerFirstExecutorMessage = RegisterExecutor(
-      "1", mock[RpcEndpointRef], "localhost", 1, Map.empty[String, String])
-    when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty)
-    driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext])
-      .apply(registerFirstExecutorMessage)
-    allocatorRunnable.getValue.run()
-    verify(podOperations).create(secondResolvedPod)
-  }
-
-  test("Scaled down executors should be cleaned up") {
-    sparkConf
-      .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
-      .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
-    val scheduler = newSchedulerBackend()
-    scheduler.start()
-
-    // The scheduler backend spins up one executor pod.
-    requestExecutorRunnable.getValue.run()
-    when(podOperations.create(any(classOf[Pod])))
-      .thenAnswer(AdditionalAnswers.returnsFirstArg())
-    val resolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
-    allocatorRunnable.getValue.run()
-    val executorEndpointRef = mock[RpcEndpointRef]
-    when(executorEndpointRef.address).thenReturn(RpcAddress("pod.example.com", 
9000))
-    val registerFirstExecutorMessage = RegisterExecutor(
-      "1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String])
-    when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty)
-    driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext])
-      .apply(registerFirstExecutorMessage)
-
-    // Request that there are 0 executors and trigger deletion from driver.
-    scheduler.doRequestTotalExecutors(0)
-    requestExecutorRunnable.getAllValues.asScala.last.run()
-    scheduler.doKillExecutors(Seq("1"))
-    requestExecutorRunnable.getAllValues.asScala.last.run()
-    verify(podOperations, times(1)).delete(resolvedPod)
-    driverEndpoint.getValue.onDisconnected(executorEndpointRef.address)
-
-    val exitedPod = exitPod(resolvedPod, 0)
-    executorPodsWatcherArgument.getValue.eventReceived(Action.DELETED, 
exitedPod)
-    allocatorRunnable.getValue.run()
-
-    // No more deletion attempts of the executors.
-    // This is graceful termination and should not be detected as a failure.
-    verify(podOperations, times(1)).delete(resolvedPod)
-    verify(driverEndpointRef, times(1)).send(
-      RemoveExecutor("1", ExecutorExited(
-        0,
-        exitCausedByApp = false,
-        s"Container in pod ${exitedPod.getMetadata.getName} exited from" +
-          s" explicit termination request.")))
-  }
-
-  test("Executors that fail should not be deleted.") {
-    sparkConf
-      .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
-      .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
-
-    val scheduler = newSchedulerBackend()
-    scheduler.start()
-    val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
-    
when(podOperations.create(any(classOf[Pod]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
-    requestExecutorRunnable.getValue.run()
-    allocatorRunnable.getValue.run()
-    val executorEndpointRef = mock[RpcEndpointRef]
-    when(executorEndpointRef.address).thenReturn(RpcAddress("pod.example.com", 
9000))
-    val registerFirstExecutorMessage = RegisterExecutor(
-      "1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String])
-    when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty)
-    driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext])
-      .apply(registerFirstExecutorMessage)
-    driverEndpoint.getValue.onDisconnected(executorEndpointRef.address)
-    executorPodsWatcherArgument.getValue.eventReceived(
-      Action.ERROR, exitPod(firstResolvedPod, 1))
-
-    // A replacement executor should be created but the error pod should 
persist.
-    val replacementPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
-    scheduler.doRequestTotalExecutors(1)
-    requestExecutorRunnable.getValue.run()
-    allocatorRunnable.getAllValues.asScala.last.run()
-    verify(podOperations, never()).delete(firstResolvedPod)
-    verify(driverEndpointRef).send(
-      RemoveExecutor("1", ExecutorExited(
-        1,
-        exitCausedByApp = true,
-        s"Pod ${FIRST_EXECUTOR_POD.getMetadata.getName}'s executor container 
exited with" +
-          " exit status code 1.")))
-  }
-
-  test("Executors disconnected due to unknown reasons are deleted and 
replaced.") {
-    sparkConf
-      .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
-      .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
-    val executorLostReasonCheckMaxAttempts = sparkConf.get(
-      KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS)
-
-    val scheduler = newSchedulerBackend()
-    scheduler.start()
-    val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
-    
when(podOperations.create(any(classOf[Pod]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
-    requestExecutorRunnable.getValue.run()
-    allocatorRunnable.getValue.run()
-    val executorEndpointRef = mock[RpcEndpointRef]
-    when(executorEndpointRef.address).thenReturn(RpcAddress("pod.example.com", 
9000))
-    val registerFirstExecutorMessage = RegisterExecutor(
-      "1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String])
-    when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty)
-    driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext])
-      .apply(registerFirstExecutorMessage)
-
-    driverEndpoint.getValue.onDisconnected(executorEndpointRef.address)
-    1 to executorLostReasonCheckMaxAttempts foreach { _ =>
-      allocatorRunnable.getValue.run()
-      verify(podOperations, never()).delete(FIRST_EXECUTOR_POD)
+    when(kubernetesClient.pods()).thenReturn(podOperations)
+    schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend(
+      taskScheduler,
+      rpcEnv,
+      kubernetesClient,
+      requestExecutorsService,
+      eventQueue,
+      podAllocator,
+      lifecycleEventHandler,
+      watchEvents,
+      pollEvents) {
+      override def applicationId(): String = TEST_SPARK_APP_ID
     }
-
-    val recreatedResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
-    allocatorRunnable.getValue.run()
-    verify(podOperations).delete(firstResolvedPod)
-    verify(driverEndpointRef).send(
-      RemoveExecutor("1", SlaveLost("Executor lost for unknown reasons.")))
   }
 
-  test("Executors that fail to start on the Kubernetes API call rebuild in the 
next batch.") {
-    sparkConf
-      .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
-      .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
-    val scheduler = newSchedulerBackend()
-    scheduler.start()
-    val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
-    when(podOperations.create(firstResolvedPod))
-      .thenThrow(new RuntimeException("test"))
-    requestExecutorRunnable.getValue.run()
-    allocatorRunnable.getValue.run()
-    verify(podOperations, times(1)).create(firstResolvedPod)
-    val recreatedResolvedPod = expectPodCreationWithId(2, FIRST_EXECUTOR_POD)
-    allocatorRunnable.getValue.run()
-    verify(podOperations).create(recreatedResolvedPod)
+  test("Start all components") {
+    schedulerBackendUnderTest.start()
+    verify(podAllocator).setTotalExpectedExecutors(3)
+    verify(podAllocator).start(TEST_SPARK_APP_ID)
+    verify(lifecycleEventHandler).start(schedulerBackendUnderTest)
+    verify(watchEvents).start(TEST_SPARK_APP_ID)
+    verify(pollEvents).start(TEST_SPARK_APP_ID)
   }
 
-  test("Executors that are initially created but the watch notices them fail 
are rebuilt" +
-    " in the next batch.") {
-    sparkConf
-      .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
-      .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
-    val scheduler = newSchedulerBackend()
-    scheduler.start()
-    val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
-    
when(podOperations.create(FIRST_EXECUTOR_POD)).thenAnswer(AdditionalAnswers.returnsFirstArg())
-    requestExecutorRunnable.getValue.run()
-    allocatorRunnable.getValue.run()
-    verify(podOperations, times(1)).create(firstResolvedPod)
-    executorPodsWatcherArgument.getValue.eventReceived(Action.ERROR, 
firstResolvedPod)
-    val recreatedResolvedPod = expectPodCreationWithId(2, FIRST_EXECUTOR_POD)
-    allocatorRunnable.getValue.run()
-    verify(podOperations).create(recreatedResolvedPod)
+  test("Stop all components") {
+    when(podOperations.withLabel(SPARK_APP_ID_LABEL, 
TEST_SPARK_APP_ID)).thenReturn(labeledPods)
+    when(labeledPods.withLabel(SPARK_ROLE_LABEL, 
SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
+    schedulerBackendUnderTest.stop()
+    verify(eventQueue).stop()
+    verify(watchEvents).stop()
+    verify(pollEvents).stop()
+    verify(labeledPods).delete()
+    verify(kubernetesClient).close()
   }
 
-  private def newSchedulerBackend(): KubernetesClusterSchedulerBackend = {
-    new KubernetesClusterSchedulerBackend(
-      taskSchedulerImpl,
-      rpcEnv,
-      executorBuilder,
-      kubernetesClient,
-      allocatorExecutor,
-      requestExecutorsService) {
-
-      override def applicationId(): String = APP_ID
-    }
+  test("Remove executor") {
+    schedulerBackendUnderTest.start()
+    schedulerBackendUnderTest.doRemoveExecutor(
+      "1", ExecutorKilled)
+    verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled))
   }
 
-  private def exitPod(basePod: Pod, exitCode: Int): Pod = {
-    new PodBuilder(basePod)
-      .editStatus()
-        .addNewContainerStatus()
-          .withNewState()
-            .withNewTerminated()
-              .withExitCode(exitCode)
-              .endTerminated()
-            .endState()
-          .endContainerStatus()
-        .endStatus()
-      .build()
+  test("Kill executors") {
+    schedulerBackendUnderTest.start()
+    when(podOperations.withLabel(SPARK_APP_ID_LABEL, 
TEST_SPARK_APP_ID)).thenReturn(labeledPods)
+    when(labeledPods.withLabel(SPARK_ROLE_LABEL, 
SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
+    when(labeledPods.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", 
"2")).thenReturn(labeledPods)
+    schedulerBackendUnderTest.doKillExecutors(Seq("1", "2"))
+    verify(labeledPods, never()).delete()
+    requestExecutorsService.runNextPendingCommand()
+    verify(labeledPods).delete()
   }
 
-  private def expectPodCreationWithId(executorId: Int, expectedPod: Pod): Pod 
= {
-    val resolvedPod = new PodBuilder(expectedPod)
-      .editMetadata()
-        .addToLabels(SPARK_EXECUTOR_ID_LABEL, executorId.toString)
-        .endMetadata()
-      .build()
-    val resolvedContainer = new ContainerBuilder().build()
-    when(executorBuilder.buildFromFeatures(Matchers.argThat(
-      new BaseMatcher[KubernetesConf[KubernetesExecutorSpecificConf]] {
-        override def matches(argument: scala.Any)
-          : Boolean = {
-          
argument.isInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]] &&
-            
argument.asInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]
-              .roleSpecificConf.executorId == executorId.toString
-        }
-
-        override def describeTo(description: Description): Unit = {}
-      }))).thenReturn(SparkPod(resolvedPod, resolvedContainer))
-    new PodBuilder(resolvedPod)
-      .editSpec()
-        .addToContainers(resolvedContainer)
-        .endSpec()
-      .build()
+  test("Request total executors") {
+    schedulerBackendUnderTest.start()
+    schedulerBackendUnderTest.doRequestTotalExecutors(5)
+    verify(podAllocator).setTotalExpectedExecutors(3)
+    verify(podAllocator, never()).setTotalExpectedExecutors(5)
+    requestExecutorsService.runNextPendingCommand()
+    verify(podAllocator).setTotalExpectedExecutors(5)
   }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to