This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 699b9a5ff06 [SPARK-43319][K8S][TEST] Remove usage of deprecated 
DefaultKubernetesClient
699b9a5ff06 is described below

commit 699b9a5ff06c9640ff25ddd4dd47084781b556b6
Author: Cheng Pan <[email protected]>
AuthorDate: Wed May 3 11:16:35 2023 -0700

    [SPARK-43319][K8S][TEST] Remove usage of deprecated DefaultKubernetesClient
    
    ### What changes were proposed in this pull request?
    
    Migrate from deprecated `DefaultKubernetesClient` to suggested 
`KubernetesClient`.
    
    Note: The fabric8io/kubernetes-client changes rapidly, there are still 
bunches of deprecated API usages in the codebase, would like to migrate them in 
separated PRs.
    
    ### Why are the changes needed?
    
    ```
    /**
     * Class for Default Kubernetes Client implementing KubernetesClient 
interface.
     * It is thread safe.
     *
     * deprecated direct usage should no longer be needed. Please use the {link 
KubernetesClientBuilder} instead.
     */
    Deprecated
    public class DefaultKubernetesClient ...
    ```
    
    ```
    public interface StorageAPIGroupDSL extends Client {
      /**
       * DSL entrypoint for storage.k8s.io/v1 StorageClass
       *
       * deprecated Use <code>client.storage().v1().storageClasses()</code> 
instead
       * return {link NonNamespaceOperation} for StorageClass
       */
      Deprecated
      NonNamespaceOperation<StorageClass, StorageClassList, 
Resource<StorageClass>> storageClasses();
    
      ...
    }
    ```
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass GA.
    
    Closes #40994 from pan3793/SPARK-43319.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../k8s/integrationtest/BasicTestsSuite.scala      | 10 ++--
 .../k8s/integrationtest/ClientModeTestsSuite.scala |  4 +-
 .../k8s/integrationtest/DecommissionSuite.scala    |  5 +-
 .../k8s/integrationtest/DepsTestsSuite.scala       |  4 ++
 .../k8s/integrationtest/KubernetesSuite.scala      | 28 +++++++++--
 .../integrationtest/KubernetesTestComponents.scala | 11 ++---
 .../deploy/k8s/integrationtest/PVTestsSuite.scala  |  4 ++
 .../k8s/integrationtest/SecretsTestsSuite.scala    |  2 +
 .../spark/deploy/k8s/integrationtest/Utils.scala   |  1 +
 .../k8s/integrationtest/VolcanoTestsSuite.scala    | 57 ++++++++++++++++------
 .../backend/IntegrationTestBackend.scala           |  4 +-
 .../backend/cloud/KubeConfigBackend.scala          |  8 +--
 .../backend/minikube/Minikube.scala                |  7 ++-
 .../backend/minikube/MinikubeTestBackend.scala     |  6 +--
 14 files changed, 109 insertions(+), 42 deletions(-)

diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
index 3281c69fe90..66f2ae4924c 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
@@ -58,9 +58,13 @@ private[spark] trait BasicTestsSuite { k8sSuite: 
KubernetesSuite =>
     // Verify there is no dangling statefulset
     // This depends on the garbage collection happening inside of K8s so give 
it some time.
     Eventually.eventually(TIMEOUT, INTERVAL) {
-      val sets = 
kubernetesTestComponents.kubernetesClient.apps().statefulSets().list().getItems
-      val scalaSets = sets.asScala
-      scalaSets.size shouldBe (0)
+      val sets = kubernetesTestComponents.kubernetesClient
+        .apps()
+        .statefulSets()
+        .inNamespace(kubernetesTestComponents.namespace)
+        .list()
+        .getItems
+      sets.asScala.size shouldBe 0
     }
   }
 
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
index 93200ea1297..074e857152e 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
@@ -53,8 +53,7 @@ private[spark] trait ClientModeTestsSuite { k8sSuite: 
KubernetesSuite =>
           .endSpec()
         .build())
     try {
-      val driverPod = testBackend
-        .getKubernetesClient
+      val driverPod = testBackend.getKubernetesClient
         .pods()
         .inNamespace(kubernetesTestComponents.namespace)
         .create(new PodBuilder()
@@ -98,6 +97,7 @@ private[spark] trait ClientModeTestsSuite { k8sSuite: 
KubernetesSuite =>
       Eventually.eventually(TIMEOUT, INTERVAL) {
         assert(kubernetesTestComponents.kubernetesClient
           .pods()
+          .inNamespace(kubernetesTestComponents.namespace)
           .withName(driverPodName)
           .getLog
           .contains("Pi is roughly 3"), "The application did not complete.")
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
index 81f4660afe9..aea4486e0e2 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
@@ -156,7 +156,10 @@ private[spark] trait DecommissionSuite { k8sSuite: 
KubernetesSuite =>
             PatienceConfiguration.Timeout(Span(120, Seconds)),
             PatienceConfiguration.Interval(Span(1, Seconds))) {
 
-            val currentPod = 
client.pods().withName(pod.getMetadata.getName).get
+            val currentPod = client.pods()
+              .inNamespace(kubernetesTestComponents.namespace)
+              .withName(pod.getMetadata.getName)
+              .get
             val labels = currentPod.getMetadata.getLabels.asScala
 
             labels should not be (null)
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
index 2e517bfaa11..8b94a65264a 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
@@ -129,6 +129,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: 
KubernetesSuite =>
     Eventually.eventually(TIMEOUT, INTERVAL) (kubernetesTestComponents
       .kubernetesClient
       .services()
+      .inNamespace(kubernetesTestComponents.namespace)
       .create(minioService))
 
     // try until the stateful set of a previous test is deleted
@@ -136,6 +137,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: 
KubernetesSuite =>
       .kubernetesClient
       .apps()
       .statefulSets()
+      .inNamespace(kubernetesTestComponents.namespace)
       .create(minioStatefulSet))
   }
 
@@ -144,6 +146,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: 
KubernetesSuite =>
       .kubernetesClient
       .apps()
       .statefulSets()
+      .inNamespace(kubernetesTestComponents.namespace)
       .withName(cName)
       .withGracePeriod(0)
       .delete()
@@ -151,6 +154,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: 
KubernetesSuite =>
     kubernetesTestComponents
       .kubernetesClient
       .services()
+      .inNamespace(kubernetesTestComponents.namespace)
       .withName(svcName)
       .withGracePeriod(0)
       .delete()
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 78839ee6103..19d46b1e194 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -81,6 +81,7 @@ class KubernetesSuite extends SparkFunSuite
     logInfo("END DESCRIBE PODS for the application")
     val driverPodOption = kubernetesTestComponents.kubernetesClient
       .pods()
+      .inNamespace(kubernetesTestComponents.namespace)
       .withLabel("spark-app-locator", appLocator)
       .withLabel("spark-role", "driver")
       .list()
@@ -91,12 +92,14 @@ class KubernetesSuite extends SparkFunSuite
       logInfo("BEGIN driver POD log\n" +
         kubernetesTestComponents.kubernetesClient
           .pods()
+          .inNamespace(kubernetesTestComponents.namespace)
           .withName(driverPod.getMetadata.getName)
           .getLog)
       logInfo("END driver POD log")
     }
     kubernetesTestComponents.kubernetesClient
       .pods()
+      .inNamespace(kubernetesTestComponents.namespace)
       .withLabel("spark-app-locator", appLocator)
       .withLabel("spark-role", "executor")
       .list()
@@ -104,6 +107,7 @@ class KubernetesSuite extends SparkFunSuite
         val podLog = try {
           kubernetesTestComponents.kubernetesClient
             .pods()
+            .inNamespace(kubernetesTestComponents.namespace)
             .withName(execPod.getMetadata.getName)
             .getLog
         } catch {
@@ -318,6 +322,7 @@ class KubernetesSuite extends SparkFunSuite
 
     val driverPod = kubernetesTestComponents.kubernetesClient
       .pods()
+      .inNamespace(kubernetesTestComponents.namespace)
       .withLabel("spark-app-locator", appLocator)
       .withLabel("spark-role", "driver")
       .list()
@@ -329,6 +334,7 @@ class KubernetesSuite extends SparkFunSuite
       expectedJVMValue.foreach { e =>
         assert(kubernetesTestComponents.kubernetesClient
           .pods()
+          .inNamespace(kubernetesTestComponents.namespace)
           .withName(driverPod.getMetadata.getName)
           .getLog
           .contains(e), "The application did not complete.")
@@ -385,6 +391,7 @@ class KubernetesSuite extends SparkFunSuite
 
     val execWatcher = kubernetesTestComponents.kubernetesClient
       .pods()
+      .inNamespace(kubernetesTestComponents.namespace)
       .withLabel("spark-app-locator", customAppLocator.getOrElse(appLocator))
       .withLabel("spark-role", "executor")
       .watch(new Watcher[Pod] {
@@ -415,6 +422,7 @@ class KubernetesSuite extends SparkFunSuite
                 Eventually.eventually(TIMEOUT, INTERVAL) {
                   assert(kubernetesTestComponents.kubernetesClient
                     .pods()
+                    .inNamespace(kubernetesTestComponents.namespace)
                     .withName(driverPodName)
                     .getLog
                     .contains("Waiting to give nodes time to finish migration, 
decom exec 1."),
@@ -425,8 +433,12 @@ class KubernetesSuite extends SparkFunSuite
                 // We set an intentionally long grace period to test that Spark
                 // exits once the blocks are done migrating and doesn't wait 
for the
                 // entire grace period if it does not need to.
-                kubernetesTestComponents.kubernetesClient.pods()
-                  .withName(name).withGracePeriod(Int.MaxValue).delete()
+                kubernetesTestComponents.kubernetesClient
+                  .pods()
+                  .inNamespace(kubernetesTestComponents.namespace)
+                  .withName(name)
+                  .withGracePeriod(Int.MaxValue)
+                  .delete()
                 logDebug(s"Triggered pod decom/delete: $name deleted")
                 // Make sure this pod is deleted
                 Eventually.eventually(TIMEOUT, INTERVAL) {
@@ -458,6 +470,7 @@ class KubernetesSuite extends SparkFunSuite
 
     val driverPod = kubernetesTestComponents.kubernetesClient
       .pods()
+      .inNamespace(kubernetesTestComponents.namespace)
       .withLabel("spark-app-locator", customAppLocator.getOrElse(appLocator))
       .withLabel("spark-role", "driver")
       .list()
@@ -477,6 +490,7 @@ class KubernetesSuite extends SparkFunSuite
     val execPod: Option[Pod] = if (expectedExecutorLogOnCompletion.nonEmpty) {
       Some(kubernetesTestComponents.kubernetesClient
         .pods()
+        .inNamespace(kubernetesTestComponents.namespace)
         .withLabel("spark-app-locator", appLocator)
         .withLabel("spark-role", "executor")
         .list()
@@ -490,6 +504,7 @@ class KubernetesSuite extends SparkFunSuite
       expectedDriverLogOnCompletion.foreach { e =>
         assert(kubernetesTestComponents.kubernetesClient
           .pods()
+          .inNamespace(kubernetesTestComponents.namespace)
           .withName(driverPod.getMetadata.getName)
           .getLog
           .contains(e),
@@ -498,6 +513,7 @@ class KubernetesSuite extends SparkFunSuite
       expectedExecutorLogOnCompletion.foreach { e =>
         assert(kubernetesTestComponents.kubernetesClient
           .pods()
+          .inNamespace(kubernetesTestComponents.namespace)
           .withName(execPod.get.getMetadata.getName)
           .getLog
           .contains(e),
@@ -589,7 +605,11 @@ class KubernetesSuite extends SparkFunSuite
   }
 
   private def deleteDriverPod(): Unit = {
-    
kubernetesTestComponents.kubernetesClient.pods().withName(driverPodName).delete()
+    kubernetesTestComponents.kubernetesClient
+      .pods()
+      .inNamespace(kubernetesTestComponents.namespace)
+      .withName(driverPodName)
+      .delete()
     Eventually.eventually(TIMEOUT, INTERVAL) {
       assert(kubernetesTestComponents.kubernetesClient
         .pods()
@@ -602,12 +622,14 @@ class KubernetesSuite extends SparkFunSuite
     kubernetesTestComponents
       .kubernetesClient
       .pods()
+      .inNamespace(kubernetesTestComponents.namespace)
       .withLabel("spark-app-locator", appLocator)
       .withLabel("spark-role", "executor")
       .delete()
     Eventually.eventually(TIMEOUT, INTERVAL) {
       assert(kubernetesTestComponents.kubernetesClient
         .pods()
+        .inNamespace(kubernetesTestComponents.namespace)
         .withLabel("spark-app-locator", appLocator)
         .withLabel("spark-role", "executor")
         .list()
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
index 4fdb89eab6e..4aba11bdb9d 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import io.fabric8.kubernetes.api.model.NamespaceBuilder
-import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import io.fabric8.kubernetes.client.KubernetesClient
 import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.SparkConf
@@ -33,7 +33,7 @@ import org.apache.spark.internal.config.JARS
 import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.internal.config.UI.UI_ENABLED
 
-private[spark] class KubernetesTestComponents(defaultClient: 
DefaultKubernetesClient) {
+private[spark] class KubernetesTestComponents(val kubernetesClient: 
KubernetesClient) {
 
   val namespaceOption = Option(System.getProperty(CONFIG_KEY_KUBE_NAMESPACE))
   val hasUserSpecifiedNamespace = namespaceOption.isDefined
@@ -42,11 +42,10 @@ private[spark] class 
KubernetesTestComponents(defaultClient: DefaultKubernetesCl
   val serviceAccountName =
     Option(System.getProperty(CONFIG_KEY_KUBE_SVC_ACCOUNT))
       .getOrElse("default")
-  val kubernetesClient = defaultClient.inNamespace(namespace)
   val clientConfig = kubernetesClient.getConfiguration
 
   def createNamespace(): Unit = {
-    defaultClient.namespaces.create(new NamespaceBuilder()
+    kubernetesClient.namespaces.create(new NamespaceBuilder()
       .withNewMetadata()
       .withName(namespace)
       .endMetadata()
@@ -54,9 +53,9 @@ private[spark] class KubernetesTestComponents(defaultClient: 
DefaultKubernetesCl
   }
 
   def deleteNamespace(): Unit = {
-    defaultClient.namespaces.withName(namespace).delete()
+    kubernetesClient.namespaces.withName(namespace).delete()
     Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) {
-      val namespaceList = defaultClient
+      val namespaceList = kubernetesClient
         .namespaces()
         .list()
         .getItems
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
index 0362b348e13..1d373f3f806 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
@@ -41,6 +41,7 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite 
=>
       kubernetesTestComponents
         .kubernetesClient
         .storage()
+        .v1()
         .storageClasses()
         .create(scBuilder.build())
     } catch {
@@ -98,6 +99,7 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite 
=>
     kubernetesTestComponents
       .kubernetesClient
       .persistentVolumeClaims()
+      .inNamespace(kubernetesTestComponents.namespace)
       .create(pvcBuilder.build())
   }
 
@@ -105,6 +107,7 @@ private[spark] trait PVTestsSuite { k8sSuite: 
KubernetesSuite =>
     kubernetesTestComponents
       .kubernetesClient
       .persistentVolumeClaims()
+      .inNamespace(kubernetesTestComponents.namespace)
       .withName(PVC_NAME)
       .delete()
 
@@ -117,6 +120,7 @@ private[spark] trait PVTestsSuite { k8sSuite: 
KubernetesSuite =>
     kubernetesTestComponents
       .kubernetesClient
       .storage()
+      .v1()
       .storageClasses()
       .withName(STORAGE_NAME)
       .delete()
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
index 39ae94b17c6..1d999e701af 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
@@ -44,6 +44,7 @@ private[spark] trait SecretsTestsSuite { k8sSuite: 
KubernetesSuite =>
     val sec = kubernetesTestComponents
       .kubernetesClient
       .secrets()
+      .inNamespace(kubernetesTestComponents.namespace)
       .createOrReplace(envSecret)
   }
 
@@ -51,6 +52,7 @@ private[spark] trait SecretsTestsSuite { k8sSuite: 
KubernetesSuite =>
     kubernetesTestComponents
       .kubernetesClient
       .secrets()
+      .inNamespace(kubernetesTestComponents.namespace)
       .withName(ENV_SECRET_NAME)
       .delete()
   }
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
index 1f39feef42d..7df8df0fb79 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
@@ -55,6 +55,7 @@ object Utils extends Logging {
     val pod = kubernetesTestComponents
       .kubernetesClient
       .pods()
+      .inNamespace(kubernetesTestComponents.namespace)
       .withName(podName)
     // Avoid timing issues by looking for open/close
     class ReadyListener extends ExecListener {
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
index f37a7644a94..06d6f7dc100 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
@@ -28,7 +28,6 @@ import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.Future
 
 import io.fabric8.kubernetes.api.model.{HasMetadata, Pod, Quantity}
-import io.fabric8.kubernetes.client.NamespacedKubernetesClient
 import io.fabric8.volcano.client.VolcanoClient
 import io.fabric8.volcano.scheduling.v1beta1.{Queue, QueueBuilder}
 import org.scalatest.BeforeAndAfterEach
@@ -46,9 +45,6 @@ private[spark] trait VolcanoTestsSuite extends 
BeforeAndAfterEach { k8sSuite: Ku
   import 
org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{k8sTestTag, 
INTERVAL, TIMEOUT,
     SPARK_DRIVER_MAIN_CLASS}
 
-  lazy val volcanoClient: VolcanoClient
-    = kubernetesTestComponents.kubernetesClient.adapt(classOf[VolcanoClient])
-  lazy val k8sClient: NamespacedKubernetesClient = 
kubernetesTestComponents.kubernetesClient
   private val testGroups: mutable.Set[String] = mutable.Set.empty
   private val testYAMLPaths: mutable.Set[String] = mutable.Set.empty
   private val testResources: mutable.Set[HasMetadata] = mutable.Set.empty
@@ -58,9 +54,19 @@ private[spark] trait VolcanoTestsSuite extends 
BeforeAndAfterEach { k8sSuite: Ku
 
   private def deletePodInTestGroup(): Unit = {
     testGroups.foreach { g =>
-      k8sClient.pods().withLabel("spark-group-locator", g).delete()
+      kubernetesTestComponents.kubernetesClient
+        .pods()
+        .inNamespace(kubernetesTestComponents.namespace)
+        .withLabel("spark-group-locator", g)
+        .delete()
       Eventually.eventually(TIMEOUT, INTERVAL) {
-        assert(k8sClient.pods().withLabel("spark-group-locator", 
g).list().getItems.isEmpty)
+        assert(kubernetesTestComponents.kubernetesClient
+          .pods()
+          .inNamespace(kubernetesTestComponents.namespace)
+          .withLabel("spark-group-locator", g)
+          .list()
+          .getItems
+          .isEmpty)
       }
     }
     testGroups.clear()
@@ -70,7 +76,10 @@ private[spark] trait VolcanoTestsSuite extends 
BeforeAndAfterEach { k8sSuite: Ku
     testYAMLPaths.foreach { yaml =>
       deleteYAMLResource(yaml)
       Eventually.eventually(TIMEOUT, INTERVAL) {
-        val resources = k8sClient.load(new 
FileInputStream(yaml)).fromServer.get.asScala
+        val resources = kubernetesTestComponents.kubernetesClient
+          .load(new FileInputStream(yaml))
+          .inNamespace(kubernetesTestComponents.namespace)
+          .get.asScala
         // Make sure all elements are null (no specific resources in cluster)
         resources.foreach { r => assert(r === null) }
       }
@@ -80,9 +89,15 @@ private[spark] trait VolcanoTestsSuite extends 
BeforeAndAfterEach { k8sSuite: Ku
 
   private def deleteResources(): Unit = {
     testResources.foreach { _ =>
-      k8sClient.resourceList(testResources.toSeq: _*).delete()
+      kubernetesTestComponents.kubernetesClient
+        .resourceList(testResources.toSeq: _*)
+        .inNamespace(kubernetesTestComponents.namespace)
+        .delete()
       Eventually.eventually(TIMEOUT, INTERVAL) {
-        val resources = k8sClient.resourceList(testResources.toSeq: 
_*).fromServer.get.asScala
+        val resources = kubernetesTestComponents.kubernetesClient
+          .resourceList(testResources.toSeq: _*)
+          .inNamespace(kubernetesTestComponents.namespace)
+          .get().asScala
         // Make sure all elements are null (no specific resources in cluster)
         resources.foreach { r => assert(r === null) }
       }
@@ -120,7 +135,11 @@ private[spark] trait VolcanoTestsSuite extends 
BeforeAndAfterEach { k8sSuite: Ku
       priorityClassName: Option[String] = None): Unit = {
     val appId = pod.getMetadata.getLabels.get("spark-app-selector")
     val podGroupName = s"$appId-podgroup"
-    val podGroup = volcanoClient.podGroups().withName(podGroupName).get()
+    val podGroup = 
kubernetesTestComponents.kubernetesClient.adapt(classOf[VolcanoClient])
+      .podGroups()
+      .inNamespace(kubernetesTestComponents.namespace)
+      .withName(podGroupName)
+      .get()
     assert(podGroup.getMetadata.getOwnerReferences.get(0).getName === 
pod.getMetadata.getName)
     queue.foreach(q => assert(q === podGroup.getSpec.getQueue))
     priorityClassName.foreach(_ =>
@@ -128,7 +147,10 @@ private[spark] trait VolcanoTestsSuite extends 
BeforeAndAfterEach { k8sSuite: Ku
   }
 
   private def createOrReplaceResource(resource: Queue): Unit = {
-    volcanoClient.queues().createOrReplace(resource)
+    kubernetesTestComponents.kubernetesClient.adapt(classOf[VolcanoClient])
+      .queues()
+      .inNamespace(kubernetesTestComponents.namespace)
+      .createOrReplace(resource)
     testResources += resource
   }
 
@@ -152,20 +174,27 @@ private[spark] trait VolcanoTestsSuite extends 
BeforeAndAfterEach { k8sSuite: Ku
   }
 
   private def createOrReplaceYAMLResource(yamlPath: String): Unit = {
-    k8sClient.load(new FileInputStream(yamlPath)).createOrReplace()
+    kubernetesTestComponents.kubernetesClient
+      .load(new FileInputStream(yamlPath))
+      .inNamespace(kubernetesTestComponents.namespace)
+      .createOrReplace()
     testYAMLPaths += yamlPath
   }
 
   private def deleteYAMLResource(yamlPath: String): Unit = {
-    k8sClient.load(new FileInputStream(yamlPath)).delete()
+    kubernetesTestComponents.kubernetesClient
+      .load(new FileInputStream(yamlPath))
+      .inNamespace(kubernetesTestComponents.namespace)
+      .delete()
   }
 
   private def getPods(
       role: String,
       groupLocator: String,
       statusPhase: String): mutable.Buffer[Pod] = {
-    k8sClient
+    kubernetesTestComponents.kubernetesClient
       .pods()
+      .inNamespace(kubernetesTestComponents.namespace)
       .withLabel("spark-group-locator", groupLocator)
       .withLabel("spark-role", role)
       .withField("status.phase", statusPhase)
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala
index ced8151b709..f767f9be55a 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.deploy.k8s.integrationtest.backend
 
-import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import io.fabric8.kubernetes.client.KubernetesClient
 
 import org.apache.spark.deploy.k8s.integrationtest.ProcessUtils
 import org.apache.spark.deploy.k8s.integrationtest.TestConstants._
@@ -27,7 +27,7 @@ import 
org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTest
 
 private[spark] trait IntegrationTestBackend {
   def initialize(): Unit
-  def getKubernetesClient: DefaultKubernetesClient
+  def getKubernetesClient: KubernetesClient
   def cleanUp(): Unit = {}
   def describePods(labels: String): Seq[String] =
     ProcessUtils.executeProcess(
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala
index b2d11828232..aa34cc617d6 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.deploy.k8s.integrationtest.backend.cloud
 
-import io.fabric8.kubernetes.client.{Config, DefaultKubernetesClient}
+import io.fabric8.kubernetes.client.{Config, KubernetesClient, 
KubernetesClientBuilder}
 import io.fabric8.kubernetes.client.utils.Utils
 import org.apache.commons.lang3.StringUtils
 
@@ -31,7 +31,7 @@ private[spark] class KubeConfigBackend(var context: String)
     s"${if (context != null) s"context ${context}" else "default context"}" +
     s" from users K8S config file")
 
-  private var defaultClient: DefaultKubernetesClient = _
+  private var defaultClient: KubernetesClient = _
 
   override def initialize(): Unit = {
     // Auto-configure K8S client from K8S config file
@@ -55,7 +55,7 @@ private[spark] class KubeConfigBackend(var context: String)
       }
     }
 
-    defaultClient = new DefaultKubernetesClient(config)
+    defaultClient = new KubernetesClientBuilder().withConfig(config).build()
   }
 
   override def cleanUp(): Unit = {
@@ -65,7 +65,7 @@ private[spark] class KubeConfigBackend(var context: String)
     super.cleanUp()
   }
 
-  override def getKubernetesClient: DefaultKubernetesClient = {
+  override def getKubernetesClient: KubernetesClient = {
     defaultClient
   }
 }
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
index 70a849c37e4..1d2f27fe7b9 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
@@ -16,8 +16,7 @@
  */
 package org.apache.spark.deploy.k8s.integrationtest.backend.minikube
 
-import io.fabric8.kubernetes.client.Config
-import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import io.fabric8.kubernetes.client.{Config, KubernetesClient, 
KubernetesClientBuilder}
 
 import org.apache.spark.deploy.k8s.integrationtest.ProcessUtils
 import org.apache.spark.internal.Logging
@@ -40,7 +39,7 @@ private[spark] object Minikube extends Logging {
   def logVersion(): Unit =
     logInfo(minikubeVersionString)
 
-  def getKubernetesClient: DefaultKubernetesClient = {
+  def getKubernetesClient: KubernetesClient = {
     // only the three-part version number is matched (the optional suffix like 
"-beta.0" is dropped)
     val versionArrayOpt = "\\d+\\.\\d+\\.\\d+".r
       .findFirstIn(minikubeVersionString.split(VERSION_PREFIX)(1))
@@ -58,7 +57,7 @@ private[spark] object Minikube extends Logging {
           "non-numeric suffix is intentionally dropped)")
     }
 
-    new DefaultKubernetesClient(Config.autoConfigure("minikube"))
+    new 
KubernetesClientBuilder().withConfig(Config.autoConfigure("minikube")).build()
   }
 
   def getMinikubeStatus(): MinikubeStatus.Value = {
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala
index f2ca57f89d0..36c27116368 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala
@@ -16,13 +16,13 @@
  */
 package org.apache.spark.deploy.k8s.integrationtest.backend.minikube
 
-import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import io.fabric8.kubernetes.client.KubernetesClient
 
 import 
org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend
 
 private[spark] object MinikubeTestBackend extends IntegrationTestBackend {
 
-  private var defaultClient: DefaultKubernetesClient = _
+  private var defaultClient: KubernetesClient = _
 
   override def initialize(): Unit = {
     Minikube.logVersion()
@@ -40,7 +40,7 @@ private[spark] object MinikubeTestBackend extends 
IntegrationTestBackend {
     super.cleanUp()
   }
 
-  override def getKubernetesClient: DefaultKubernetesClient = {
+  override def getKubernetesClient: KubernetesClient = {
     defaultClient
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to