This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 699b9a5ff06 [SPARK-43319][K8S][TEST] Remove usage of deprecated
DefaultKubernetesClient
699b9a5ff06 is described below
commit 699b9a5ff06c9640ff25ddd4dd47084781b556b6
Author: Cheng Pan <[email protected]>
AuthorDate: Wed May 3 11:16:35 2023 -0700
[SPARK-43319][K8S][TEST] Remove usage of deprecated DefaultKubernetesClient
### What changes were proposed in this pull request?
Migrate from deprecated `DefaultKubernetesClient` to suggested
`KubernetesClient`.
Note: The fabric8io/kubernetes-client changes rapidly, there are still
bunches of deprecated API usages in the codebase, would like to migrate them in
separated PRs.
### Why are the changes needed?
```
/**
* Class for Default Kubernetes Client implementing KubernetesClient
interface.
* It is thread safe.
*
* deprecated direct usage should no longer be needed. Please use the {link
KubernetesClientBuilder} instead.
*/
Deprecated
public class DefaultKubernetesClient ...
```
```
public interface StorageAPIGroupDSL extends Client {
/**
* DSL entrypoint for storage.k8s.io/v1 StorageClass
*
* deprecated Use <code>client.storage().v1().storageClasses()</code>
instead
* return {link NonNamespaceOperation} for StorageClass
*/
Deprecated
NonNamespaceOperation<StorageClass, StorageClassList,
Resource<StorageClass>> storageClasses();
...
}
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA.
Closes #40994 from pan3793/SPARK-43319.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../k8s/integrationtest/BasicTestsSuite.scala | 10 ++--
.../k8s/integrationtest/ClientModeTestsSuite.scala | 4 +-
.../k8s/integrationtest/DecommissionSuite.scala | 5 +-
.../k8s/integrationtest/DepsTestsSuite.scala | 4 ++
.../k8s/integrationtest/KubernetesSuite.scala | 28 +++++++++--
.../integrationtest/KubernetesTestComponents.scala | 11 ++---
.../deploy/k8s/integrationtest/PVTestsSuite.scala | 4 ++
.../k8s/integrationtest/SecretsTestsSuite.scala | 2 +
.../spark/deploy/k8s/integrationtest/Utils.scala | 1 +
.../k8s/integrationtest/VolcanoTestsSuite.scala | 57 ++++++++++++++++------
.../backend/IntegrationTestBackend.scala | 4 +-
.../backend/cloud/KubeConfigBackend.scala | 8 +--
.../backend/minikube/Minikube.scala | 7 ++-
.../backend/minikube/MinikubeTestBackend.scala | 6 +--
14 files changed, 109 insertions(+), 42 deletions(-)
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
index 3281c69fe90..66f2ae4924c 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
@@ -58,9 +58,13 @@ private[spark] trait BasicTestsSuite { k8sSuite:
KubernetesSuite =>
// Verify there is no dangling statefulset
// This depends on the garbage collection happening inside of K8s so give
it some time.
Eventually.eventually(TIMEOUT, INTERVAL) {
- val sets =
kubernetesTestComponents.kubernetesClient.apps().statefulSets().list().getItems
- val scalaSets = sets.asScala
- scalaSets.size shouldBe (0)
+ val sets = kubernetesTestComponents.kubernetesClient
+ .apps()
+ .statefulSets()
+ .inNamespace(kubernetesTestComponents.namespace)
+ .list()
+ .getItems
+ sets.asScala.size shouldBe 0
}
}
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
index 93200ea1297..074e857152e 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
@@ -53,8 +53,7 @@ private[spark] trait ClientModeTestsSuite { k8sSuite:
KubernetesSuite =>
.endSpec()
.build())
try {
- val driverPod = testBackend
- .getKubernetesClient
+ val driverPod = testBackend.getKubernetesClient
.pods()
.inNamespace(kubernetesTestComponents.namespace)
.create(new PodBuilder()
@@ -98,6 +97,7 @@ private[spark] trait ClientModeTestsSuite { k8sSuite:
KubernetesSuite =>
Eventually.eventually(TIMEOUT, INTERVAL) {
assert(kubernetesTestComponents.kubernetesClient
.pods()
+ .inNamespace(kubernetesTestComponents.namespace)
.withName(driverPodName)
.getLog
.contains("Pi is roughly 3"), "The application did not complete.")
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
index 81f4660afe9..aea4486e0e2 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
@@ -156,7 +156,10 @@ private[spark] trait DecommissionSuite { k8sSuite:
KubernetesSuite =>
PatienceConfiguration.Timeout(Span(120, Seconds)),
PatienceConfiguration.Interval(Span(1, Seconds))) {
- val currentPod =
client.pods().withName(pod.getMetadata.getName).get
+ val currentPod = client.pods()
+ .inNamespace(kubernetesTestComponents.namespace)
+ .withName(pod.getMetadata.getName)
+ .get
val labels = currentPod.getMetadata.getLabels.asScala
labels should not be (null)
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
index 2e517bfaa11..8b94a65264a 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
@@ -129,6 +129,7 @@ private[spark] trait DepsTestsSuite { k8sSuite:
KubernetesSuite =>
Eventually.eventually(TIMEOUT, INTERVAL) (kubernetesTestComponents
.kubernetesClient
.services()
+ .inNamespace(kubernetesTestComponents.namespace)
.create(minioService))
// try until the stateful set of a previous test is deleted
@@ -136,6 +137,7 @@ private[spark] trait DepsTestsSuite { k8sSuite:
KubernetesSuite =>
.kubernetesClient
.apps()
.statefulSets()
+ .inNamespace(kubernetesTestComponents.namespace)
.create(minioStatefulSet))
}
@@ -144,6 +146,7 @@ private[spark] trait DepsTestsSuite { k8sSuite:
KubernetesSuite =>
.kubernetesClient
.apps()
.statefulSets()
+ .inNamespace(kubernetesTestComponents.namespace)
.withName(cName)
.withGracePeriod(0)
.delete()
@@ -151,6 +154,7 @@ private[spark] trait DepsTestsSuite { k8sSuite:
KubernetesSuite =>
kubernetesTestComponents
.kubernetesClient
.services()
+ .inNamespace(kubernetesTestComponents.namespace)
.withName(svcName)
.withGracePeriod(0)
.delete()
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 78839ee6103..19d46b1e194 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -81,6 +81,7 @@ class KubernetesSuite extends SparkFunSuite
logInfo("END DESCRIBE PODS for the application")
val driverPodOption = kubernetesTestComponents.kubernetesClient
.pods()
+ .inNamespace(kubernetesTestComponents.namespace)
.withLabel("spark-app-locator", appLocator)
.withLabel("spark-role", "driver")
.list()
@@ -91,12 +92,14 @@ class KubernetesSuite extends SparkFunSuite
logInfo("BEGIN driver POD log\n" +
kubernetesTestComponents.kubernetesClient
.pods()
+ .inNamespace(kubernetesTestComponents.namespace)
.withName(driverPod.getMetadata.getName)
.getLog)
logInfo("END driver POD log")
}
kubernetesTestComponents.kubernetesClient
.pods()
+ .inNamespace(kubernetesTestComponents.namespace)
.withLabel("spark-app-locator", appLocator)
.withLabel("spark-role", "executor")
.list()
@@ -104,6 +107,7 @@ class KubernetesSuite extends SparkFunSuite
val podLog = try {
kubernetesTestComponents.kubernetesClient
.pods()
+ .inNamespace(kubernetesTestComponents.namespace)
.withName(execPod.getMetadata.getName)
.getLog
} catch {
@@ -318,6 +322,7 @@ class KubernetesSuite extends SparkFunSuite
val driverPod = kubernetesTestComponents.kubernetesClient
.pods()
+ .inNamespace(kubernetesTestComponents.namespace)
.withLabel("spark-app-locator", appLocator)
.withLabel("spark-role", "driver")
.list()
@@ -329,6 +334,7 @@ class KubernetesSuite extends SparkFunSuite
expectedJVMValue.foreach { e =>
assert(kubernetesTestComponents.kubernetesClient
.pods()
+ .inNamespace(kubernetesTestComponents.namespace)
.withName(driverPod.getMetadata.getName)
.getLog
.contains(e), "The application did not complete.")
@@ -385,6 +391,7 @@ class KubernetesSuite extends SparkFunSuite
val execWatcher = kubernetesTestComponents.kubernetesClient
.pods()
+ .inNamespace(kubernetesTestComponents.namespace)
.withLabel("spark-app-locator", customAppLocator.getOrElse(appLocator))
.withLabel("spark-role", "executor")
.watch(new Watcher[Pod] {
@@ -415,6 +422,7 @@ class KubernetesSuite extends SparkFunSuite
Eventually.eventually(TIMEOUT, INTERVAL) {
assert(kubernetesTestComponents.kubernetesClient
.pods()
+ .inNamespace(kubernetesTestComponents.namespace)
.withName(driverPodName)
.getLog
.contains("Waiting to give nodes time to finish migration,
decom exec 1."),
@@ -425,8 +433,12 @@ class KubernetesSuite extends SparkFunSuite
// We set an intentionally long grace period to test that Spark
// exits once the blocks are done migrating and doesn't wait
for the
// entire grace period if it does not need to.
- kubernetesTestComponents.kubernetesClient.pods()
- .withName(name).withGracePeriod(Int.MaxValue).delete()
+ kubernetesTestComponents.kubernetesClient
+ .pods()
+ .inNamespace(kubernetesTestComponents.namespace)
+ .withName(name)
+ .withGracePeriod(Int.MaxValue)
+ .delete()
logDebug(s"Triggered pod decom/delete: $name deleted")
// Make sure this pod is deleted
Eventually.eventually(TIMEOUT, INTERVAL) {
@@ -458,6 +470,7 @@ class KubernetesSuite extends SparkFunSuite
val driverPod = kubernetesTestComponents.kubernetesClient
.pods()
+ .inNamespace(kubernetesTestComponents.namespace)
.withLabel("spark-app-locator", customAppLocator.getOrElse(appLocator))
.withLabel("spark-role", "driver")
.list()
@@ -477,6 +490,7 @@ class KubernetesSuite extends SparkFunSuite
val execPod: Option[Pod] = if (expectedExecutorLogOnCompletion.nonEmpty) {
Some(kubernetesTestComponents.kubernetesClient
.pods()
+ .inNamespace(kubernetesTestComponents.namespace)
.withLabel("spark-app-locator", appLocator)
.withLabel("spark-role", "executor")
.list()
@@ -490,6 +504,7 @@ class KubernetesSuite extends SparkFunSuite
expectedDriverLogOnCompletion.foreach { e =>
assert(kubernetesTestComponents.kubernetesClient
.pods()
+ .inNamespace(kubernetesTestComponents.namespace)
.withName(driverPod.getMetadata.getName)
.getLog
.contains(e),
@@ -498,6 +513,7 @@ class KubernetesSuite extends SparkFunSuite
expectedExecutorLogOnCompletion.foreach { e =>
assert(kubernetesTestComponents.kubernetesClient
.pods()
+ .inNamespace(kubernetesTestComponents.namespace)
.withName(execPod.get.getMetadata.getName)
.getLog
.contains(e),
@@ -589,7 +605,11 @@ class KubernetesSuite extends SparkFunSuite
}
private def deleteDriverPod(): Unit = {
-
kubernetesTestComponents.kubernetesClient.pods().withName(driverPodName).delete()
+ kubernetesTestComponents.kubernetesClient
+ .pods()
+ .inNamespace(kubernetesTestComponents.namespace)
+ .withName(driverPodName)
+ .delete()
Eventually.eventually(TIMEOUT, INTERVAL) {
assert(kubernetesTestComponents.kubernetesClient
.pods()
@@ -602,12 +622,14 @@ class KubernetesSuite extends SparkFunSuite
kubernetesTestComponents
.kubernetesClient
.pods()
+ .inNamespace(kubernetesTestComponents.namespace)
.withLabel("spark-app-locator", appLocator)
.withLabel("spark-role", "executor")
.delete()
Eventually.eventually(TIMEOUT, INTERVAL) {
assert(kubernetesTestComponents.kubernetesClient
.pods()
+ .inNamespace(kubernetesTestComponents.namespace)
.withLabel("spark-app-locator", appLocator)
.withLabel("spark-role", "executor")
.list()
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
index 4fdb89eab6e..4aba11bdb9d 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import io.fabric8.kubernetes.api.model.NamespaceBuilder
-import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import io.fabric8.kubernetes.client.KubernetesClient
import org.scalatest.concurrent.Eventually
import org.apache.spark.SparkConf
@@ -33,7 +33,7 @@ import org.apache.spark.internal.config.JARS
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.internal.config.UI.UI_ENABLED
-private[spark] class KubernetesTestComponents(defaultClient:
DefaultKubernetesClient) {
+private[spark] class KubernetesTestComponents(val kubernetesClient:
KubernetesClient) {
val namespaceOption = Option(System.getProperty(CONFIG_KEY_KUBE_NAMESPACE))
val hasUserSpecifiedNamespace = namespaceOption.isDefined
@@ -42,11 +42,10 @@ private[spark] class
KubernetesTestComponents(defaultClient: DefaultKubernetesCl
val serviceAccountName =
Option(System.getProperty(CONFIG_KEY_KUBE_SVC_ACCOUNT))
.getOrElse("default")
- val kubernetesClient = defaultClient.inNamespace(namespace)
val clientConfig = kubernetesClient.getConfiguration
def createNamespace(): Unit = {
- defaultClient.namespaces.create(new NamespaceBuilder()
+ kubernetesClient.namespaces.create(new NamespaceBuilder()
.withNewMetadata()
.withName(namespace)
.endMetadata()
@@ -54,9 +53,9 @@ private[spark] class KubernetesTestComponents(defaultClient:
DefaultKubernetesCl
}
def deleteNamespace(): Unit = {
- defaultClient.namespaces.withName(namespace).delete()
+ kubernetesClient.namespaces.withName(namespace).delete()
Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) {
- val namespaceList = defaultClient
+ val namespaceList = kubernetesClient
.namespaces()
.list()
.getItems
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
index 0362b348e13..1d373f3f806 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
@@ -41,6 +41,7 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite
=>
kubernetesTestComponents
.kubernetesClient
.storage()
+ .v1()
.storageClasses()
.create(scBuilder.build())
} catch {
@@ -98,6 +99,7 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite
=>
kubernetesTestComponents
.kubernetesClient
.persistentVolumeClaims()
+ .inNamespace(kubernetesTestComponents.namespace)
.create(pvcBuilder.build())
}
@@ -105,6 +107,7 @@ private[spark] trait PVTestsSuite { k8sSuite:
KubernetesSuite =>
kubernetesTestComponents
.kubernetesClient
.persistentVolumeClaims()
+ .inNamespace(kubernetesTestComponents.namespace)
.withName(PVC_NAME)
.delete()
@@ -117,6 +120,7 @@ private[spark] trait PVTestsSuite { k8sSuite:
KubernetesSuite =>
kubernetesTestComponents
.kubernetesClient
.storage()
+ .v1()
.storageClasses()
.withName(STORAGE_NAME)
.delete()
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
index 39ae94b17c6..1d999e701af 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
@@ -44,6 +44,7 @@ private[spark] trait SecretsTestsSuite { k8sSuite:
KubernetesSuite =>
val sec = kubernetesTestComponents
.kubernetesClient
.secrets()
+ .inNamespace(kubernetesTestComponents.namespace)
.createOrReplace(envSecret)
}
@@ -51,6 +52,7 @@ private[spark] trait SecretsTestsSuite { k8sSuite:
KubernetesSuite =>
kubernetesTestComponents
.kubernetesClient
.secrets()
+ .inNamespace(kubernetesTestComponents.namespace)
.withName(ENV_SECRET_NAME)
.delete()
}
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
index 1f39feef42d..7df8df0fb79 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
@@ -55,6 +55,7 @@ object Utils extends Logging {
val pod = kubernetesTestComponents
.kubernetesClient
.pods()
+ .inNamespace(kubernetesTestComponents.namespace)
.withName(podName)
// Avoid timing issues by looking for open/close
class ReadyListener extends ExecListener {
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
index f37a7644a94..06d6f7dc100 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
@@ -28,7 +28,6 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import io.fabric8.kubernetes.api.model.{HasMetadata, Pod, Quantity}
-import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.volcano.client.VolcanoClient
import io.fabric8.volcano.scheduling.v1beta1.{Queue, QueueBuilder}
import org.scalatest.BeforeAndAfterEach
@@ -46,9 +45,6 @@ private[spark] trait VolcanoTestsSuite extends
BeforeAndAfterEach { k8sSuite: Ku
import
org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{k8sTestTag,
INTERVAL, TIMEOUT,
SPARK_DRIVER_MAIN_CLASS}
- lazy val volcanoClient: VolcanoClient
- = kubernetesTestComponents.kubernetesClient.adapt(classOf[VolcanoClient])
- lazy val k8sClient: NamespacedKubernetesClient =
kubernetesTestComponents.kubernetesClient
private val testGroups: mutable.Set[String] = mutable.Set.empty
private val testYAMLPaths: mutable.Set[String] = mutable.Set.empty
private val testResources: mutable.Set[HasMetadata] = mutable.Set.empty
@@ -58,9 +54,19 @@ private[spark] trait VolcanoTestsSuite extends
BeforeAndAfterEach { k8sSuite: Ku
private def deletePodInTestGroup(): Unit = {
testGroups.foreach { g =>
- k8sClient.pods().withLabel("spark-group-locator", g).delete()
+ kubernetesTestComponents.kubernetesClient
+ .pods()
+ .inNamespace(kubernetesTestComponents.namespace)
+ .withLabel("spark-group-locator", g)
+ .delete()
Eventually.eventually(TIMEOUT, INTERVAL) {
- assert(k8sClient.pods().withLabel("spark-group-locator",
g).list().getItems.isEmpty)
+ assert(kubernetesTestComponents.kubernetesClient
+ .pods()
+ .inNamespace(kubernetesTestComponents.namespace)
+ .withLabel("spark-group-locator", g)
+ .list()
+ .getItems
+ .isEmpty)
}
}
testGroups.clear()
@@ -70,7 +76,10 @@ private[spark] trait VolcanoTestsSuite extends
BeforeAndAfterEach { k8sSuite: Ku
testYAMLPaths.foreach { yaml =>
deleteYAMLResource(yaml)
Eventually.eventually(TIMEOUT, INTERVAL) {
- val resources = k8sClient.load(new
FileInputStream(yaml)).fromServer.get.asScala
+ val resources = kubernetesTestComponents.kubernetesClient
+ .load(new FileInputStream(yaml))
+ .inNamespace(kubernetesTestComponents.namespace)
+ .get.asScala
// Make sure all elements are null (no specific resources in cluster)
resources.foreach { r => assert(r === null) }
}
@@ -80,9 +89,15 @@ private[spark] trait VolcanoTestsSuite extends
BeforeAndAfterEach { k8sSuite: Ku
private def deleteResources(): Unit = {
testResources.foreach { _ =>
- k8sClient.resourceList(testResources.toSeq: _*).delete()
+ kubernetesTestComponents.kubernetesClient
+ .resourceList(testResources.toSeq: _*)
+ .inNamespace(kubernetesTestComponents.namespace)
+ .delete()
Eventually.eventually(TIMEOUT, INTERVAL) {
- val resources = k8sClient.resourceList(testResources.toSeq:
_*).fromServer.get.asScala
+ val resources = kubernetesTestComponents.kubernetesClient
+ .resourceList(testResources.toSeq: _*)
+ .inNamespace(kubernetesTestComponents.namespace)
+ .get().asScala
// Make sure all elements are null (no specific resources in cluster)
resources.foreach { r => assert(r === null) }
}
@@ -120,7 +135,11 @@ private[spark] trait VolcanoTestsSuite extends
BeforeAndAfterEach { k8sSuite: Ku
priorityClassName: Option[String] = None): Unit = {
val appId = pod.getMetadata.getLabels.get("spark-app-selector")
val podGroupName = s"$appId-podgroup"
- val podGroup = volcanoClient.podGroups().withName(podGroupName).get()
+ val podGroup =
kubernetesTestComponents.kubernetesClient.adapt(classOf[VolcanoClient])
+ .podGroups()
+ .inNamespace(kubernetesTestComponents.namespace)
+ .withName(podGroupName)
+ .get()
assert(podGroup.getMetadata.getOwnerReferences.get(0).getName ===
pod.getMetadata.getName)
queue.foreach(q => assert(q === podGroup.getSpec.getQueue))
priorityClassName.foreach(_ =>
@@ -128,7 +147,10 @@ private[spark] trait VolcanoTestsSuite extends
BeforeAndAfterEach { k8sSuite: Ku
}
private def createOrReplaceResource(resource: Queue): Unit = {
- volcanoClient.queues().createOrReplace(resource)
+ kubernetesTestComponents.kubernetesClient.adapt(classOf[VolcanoClient])
+ .queues()
+ .inNamespace(kubernetesTestComponents.namespace)
+ .createOrReplace(resource)
testResources += resource
}
@@ -152,20 +174,27 @@ private[spark] trait VolcanoTestsSuite extends
BeforeAndAfterEach { k8sSuite: Ku
}
private def createOrReplaceYAMLResource(yamlPath: String): Unit = {
- k8sClient.load(new FileInputStream(yamlPath)).createOrReplace()
+ kubernetesTestComponents.kubernetesClient
+ .load(new FileInputStream(yamlPath))
+ .inNamespace(kubernetesTestComponents.namespace)
+ .createOrReplace()
testYAMLPaths += yamlPath
}
private def deleteYAMLResource(yamlPath: String): Unit = {
- k8sClient.load(new FileInputStream(yamlPath)).delete()
+ kubernetesTestComponents.kubernetesClient
+ .load(new FileInputStream(yamlPath))
+ .inNamespace(kubernetesTestComponents.namespace)
+ .delete()
}
private def getPods(
role: String,
groupLocator: String,
statusPhase: String): mutable.Buffer[Pod] = {
- k8sClient
+ kubernetesTestComponents.kubernetesClient
.pods()
+ .inNamespace(kubernetesTestComponents.namespace)
.withLabel("spark-group-locator", groupLocator)
.withLabel("spark-role", role)
.withField("status.phase", statusPhase)
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala
index ced8151b709..f767f9be55a 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.k8s.integrationtest.backend
-import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.spark.deploy.k8s.integrationtest.ProcessUtils
import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
@@ -27,7 +27,7 @@ import
org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTest
private[spark] trait IntegrationTestBackend {
def initialize(): Unit
- def getKubernetesClient: DefaultKubernetesClient
+ def getKubernetesClient: KubernetesClient
def cleanUp(): Unit = {}
def describePods(labels: String): Seq[String] =
ProcessUtils.executeProcess(
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala
index b2d11828232..aa34cc617d6 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.deploy.k8s.integrationtest.backend.cloud
-import io.fabric8.kubernetes.client.{Config, DefaultKubernetesClient}
+import io.fabric8.kubernetes.client.{Config, KubernetesClient,
KubernetesClientBuilder}
import io.fabric8.kubernetes.client.utils.Utils
import org.apache.commons.lang3.StringUtils
@@ -31,7 +31,7 @@ private[spark] class KubeConfigBackend(var context: String)
s"${if (context != null) s"context ${context}" else "default context"}" +
s" from users K8S config file")
- private var defaultClient: DefaultKubernetesClient = _
+ private var defaultClient: KubernetesClient = _
override def initialize(): Unit = {
// Auto-configure K8S client from K8S config file
@@ -55,7 +55,7 @@ private[spark] class KubeConfigBackend(var context: String)
}
}
- defaultClient = new DefaultKubernetesClient(config)
+ defaultClient = new KubernetesClientBuilder().withConfig(config).build()
}
override def cleanUp(): Unit = {
@@ -65,7 +65,7 @@ private[spark] class KubeConfigBackend(var context: String)
super.cleanUp()
}
- override def getKubernetesClient: DefaultKubernetesClient = {
+ override def getKubernetesClient: KubernetesClient = {
defaultClient
}
}
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
index 70a849c37e4..1d2f27fe7b9 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
@@ -16,8 +16,7 @@
*/
package org.apache.spark.deploy.k8s.integrationtest.backend.minikube
-import io.fabric8.kubernetes.client.Config
-import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import io.fabric8.kubernetes.client.{Config, KubernetesClient,
KubernetesClientBuilder}
import org.apache.spark.deploy.k8s.integrationtest.ProcessUtils
import org.apache.spark.internal.Logging
@@ -40,7 +39,7 @@ private[spark] object Minikube extends Logging {
def logVersion(): Unit =
logInfo(minikubeVersionString)
- def getKubernetesClient: DefaultKubernetesClient = {
+ def getKubernetesClient: KubernetesClient = {
// only the three-part version number is matched (the optional suffix like
"-beta.0" is dropped)
val versionArrayOpt = "\\d+\\.\\d+\\.\\d+".r
.findFirstIn(minikubeVersionString.split(VERSION_PREFIX)(1))
@@ -58,7 +57,7 @@ private[spark] object Minikube extends Logging {
"non-numeric suffix is intentionally dropped)")
}
- new DefaultKubernetesClient(Config.autoConfigure("minikube"))
+ new
KubernetesClientBuilder().withConfig(Config.autoConfigure("minikube")).build()
}
def getMinikubeStatus(): MinikubeStatus.Value = {
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala
index f2ca57f89d0..36c27116368 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala
@@ -16,13 +16,13 @@
*/
package org.apache.spark.deploy.k8s.integrationtest.backend.minikube
-import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import io.fabric8.kubernetes.client.KubernetesClient
import
org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend
private[spark] object MinikubeTestBackend extends IntegrationTestBackend {
- private var defaultClient: DefaultKubernetesClient = _
+ private var defaultClient: KubernetesClient = _
override def initialize(): Unit = {
Minikube.logVersion()
@@ -40,7 +40,7 @@ private[spark] object MinikubeTestBackend extends
IntegrationTestBackend {
super.cleanUp()
}
- override def getKubernetesClient: DefaultKubernetesClient = {
+ override def getKubernetesClient: KubernetesClient = {
defaultClient
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]