This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 197e094  [SPARK-49052] Add SparkOperator class and tests
197e094 is described below

commit 197e094d9bc516c99d7c9776b3f495927f52e092
Author: zhou-jiang <[email protected]>
AuthorDate: Thu Aug 1 12:38:29 2024 -0700

    [SPARK-49052] Add SparkOperator class and tests
    
    ### What changes were proposed in this pull request?
    
    This PR adds class `org.apache.spark.k8s.operator.SparkOperator` as the 
main entry point of Spark Operator.
    
    ### Why are the changes needed?
    
    This bootstraps operator with metrics servers as needed.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Pass the CIs
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #29 from jiangzho/bootstrap.
    
    Authored-by: zhou-jiang <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 gradle.properties                                  |   1 +
 spark-operator/build.gradle                        |   1 +
 .../apache/spark/k8s/operator/SparkOperator.java   | 223 +++++++++++++++++++++
 .../org/apache/spark/k8s/operator/utils/Utils.java |   8 +
 .../spark/k8s/operator/SparkOperatorTest.java      | 193 ++++++++++++++++++
 .../apache/spark/k8s/operator/utils/TestUtils.java |   8 +
 6 files changed, 434 insertions(+)

diff --git a/gradle.properties b/gradle.properties
index 8711fc2..2525866 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -38,6 +38,7 @@ log4jVersion=2.22.1
 junitVersion=5.10.2
 jacocoVersion=0.8.12
 mockitoVersion=5.11.0
+powerMockVersion=2.0.9
 
 # Build Analysis
 checkstyleVersion=10.17.0
diff --git a/spark-operator/build.gradle b/spark-operator/build.gradle
index 6a733c6..e1b3586 100644
--- a/spark-operator/build.gradle
+++ b/spark-operator/build.gradle
@@ -52,6 +52,7 @@ dependencies {
   testImplementation("com.squareup.okhttp3:mockwebserver:$okHttpVersion")
   testImplementation platform("org.junit:junit-bom:$junitVersion")
   testImplementation("org.junit.jupiter:junit-jupiter:$junitVersion")
+  testImplementation("org.powermock:powermock-core:$powerMockVersion")
   testRuntimeOnly("org.junit.platform:junit-platform-launcher")
 
   testImplementation("org.mockito:mockito-core:$mockitoVersion")
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
new file mode 100644
index 0000000..d9d50a1
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import static org.apache.spark.k8s.operator.utils.Utils.getAppStatusListener;
+import static org.apache.spark.k8s.operator.utils.Utils.getWatchedNamespaces;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.Operator;
+import io.javaoperatorsdk.operator.RegisteredController;
+import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
+import io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.Interceptor;
+
+import org.apache.spark.k8s.operator.client.KubernetesClientFactory;
+import org.apache.spark.k8s.operator.config.SparkOperatorConf;
+import org.apache.spark.k8s.operator.config.SparkOperatorConfigMapReconciler;
+import org.apache.spark.k8s.operator.metrics.MetricsService;
+import org.apache.spark.k8s.operator.metrics.MetricsSystem;
+import org.apache.spark.k8s.operator.metrics.MetricsSystemFactory;
+import org.apache.spark.k8s.operator.metrics.healthcheck.SentinelManager;
+import 
org.apache.spark.k8s.operator.metrics.source.KubernetesMetricsInterceptor;
+import org.apache.spark.k8s.operator.metrics.source.OperatorJosdkMetrics;
+import org.apache.spark.k8s.operator.probe.ProbeService;
+import org.apache.spark.k8s.operator.reconciler.SparkAppReconciler;
+import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
+
+/**
+ * Entry point for Spark Operator. Bootstrap the operator app by starting 
watch & reconciler for
+ * SparkApps, starting watch for hot property loading, if enabled, and 
starting metrics server with
+ * sentinel monitor if enabled
+ */
+@Slf4j
+public class SparkOperator {
+  private final List<Operator> registeredOperators;
+  private final KubernetesClient client;
+  private final SparkAppSubmissionWorker appSubmissionWorker;
+  private final SparkAppStatusRecorder sparkAppStatusRecorder;
+  protected Set<RegisteredController<?>> registeredSparkControllers;
+  protected Set<String> watchedNamespaces;
+  private final MetricsSystem metricsSystem;
+  private final SentinelManager<SparkApplication> 
sparkApplicationSentinelManager;
+  private final ProbeService probeService;
+  private final MetricsService metricsService;
+  private final ExecutorService metricsResourcesSingleThreadPool;
+
+  public SparkOperator() {
+    this.metricsSystem = MetricsSystemFactory.createMetricsSystem();
+    this.client =
+        
KubernetesClientFactory.buildKubernetesClient(getClientInterceptors(metricsSystem));
+    this.appSubmissionWorker = new SparkAppSubmissionWorker();
+    this.sparkAppStatusRecorder = new 
SparkAppStatusRecorder(getAppStatusListener());
+    this.registeredSparkControllers = new HashSet<>();
+    this.watchedNamespaces = getWatchedNamespaces();
+    this.sparkApplicationSentinelManager = new SentinelManager<>();
+    this.registeredOperators = new ArrayList<>();
+    this.registeredOperators.add(registerSparkOperator());
+    if (SparkOperatorConf.DYNAMIC_CONFIG_ENABLED.getValue()) {
+      this.registeredOperators.add(registerSparkOperatorConfMonitor());
+    }
+    this.metricsResourcesSingleThreadPool = 
Executors.newSingleThreadExecutor();
+    this.probeService =
+        new ProbeService(
+            registeredOperators, 
Collections.singletonList(sparkApplicationSentinelManager), null);
+    this.metricsService = new MetricsService(metricsSystem, 
metricsResourcesSingleThreadPool);
+  }
+
+  protected Operator registerSparkOperator() {
+    Operator op = new Operator(this::overrideOperatorConfigs);
+    registeredSparkControllers.add(
+        op.register(
+            new SparkAppReconciler(
+                appSubmissionWorker, sparkAppStatusRecorder, 
sparkApplicationSentinelManager),
+            this::overrideControllerConfigs));
+    return op;
+  }
+
+  protected Operator registerSparkOperatorConfMonitor() {
+    Operator op = new Operator(this::overrideConfigMonitorConfigs);
+    String operatorNamespace = SparkOperatorConf.OPERATOR_NAMESPACE.getValue();
+    String confSelector = SparkOperatorConf.DYNAMIC_CONFIG_SELECTOR.getValue();
+    log.info(
+        "Starting conf monitor in namespace: {}, with selector: {}",
+        operatorNamespace,
+        confSelector);
+    op.register(
+        new SparkOperatorConfigMapReconciler(
+            this::updateWatchingNamespaces,
+            SparkOperatorConf.OPERATOR_NAMESPACE.getValue(),
+            unused -> getWatchedNamespaces()),
+        c -> {
+          c.withRateLimiter(SparkOperatorConf.getOperatorRateLimiter());
+          c.settingNamespaces(operatorNamespace);
+          c.withLabelSelector(confSelector);
+        });
+    return op;
+  }
+
+  protected boolean updateWatchingNamespaces(Set<String> namespaces) {
+    if (watchedNamespaces.equals(namespaces)) {
+      log.info("No watched namespace change detected");
+      return false;
+    }
+    if (watchedNamespaces.isEmpty()) {
+      log.info("Cannot update watch namespaces for operator started at cluster 
level.");
+      return false;
+    }
+    if (namespaces == null || namespaces.isEmpty()) {
+      log.error("Cannot updating namespaces to empty");
+      return false;
+    }
+    registeredSparkControllers.forEach(
+        c -> {
+          if (c.allowsNamespaceChanges()) {
+            log.info("Updating operator namespaces to {}", namespaces);
+            c.changeNamespaces(namespaces);
+          } else {
+            log.error("Controller does not allow namespace change, skipping 
namespace change.");
+          }
+        });
+    this.watchedNamespaces = new HashSet<>(namespaces);
+    return true;
+  }
+
+  protected void overrideOperatorConfigs(ConfigurationServiceOverrider 
overrider) {
+    overrider.withKubernetesClient(client);
+    overrider.withStopOnInformerErrorDuringStartup(
+        SparkOperatorConf.TERMINATE_ON_INFORMER_FAILURE_ENABLED.getValue());
+    overrider.withTerminationTimeoutSeconds(
+        SparkOperatorConf.RECONCILER_TERMINATION_TIMEOUT_SECONDS.getValue());
+    int parallelism = SparkOperatorConf.RECONCILER_PARALLELISM.getValue();
+    if (parallelism > 0) {
+      log.info("Configuring operator with {} reconciliation threads.", 
parallelism);
+      overrider.withConcurrentReconciliationThreads(parallelism);
+    } else {
+      log.info("Configuring operator with unbounded reconciliation thread 
pool.");
+      overrider.withExecutorService(Executors.newCachedThreadPool());
+    }
+    if (SparkOperatorConf.LEADER_ELECTION_ENABLED.getValue()) {
+      
overrider.withLeaderElectionConfiguration(SparkOperatorConf.getLeaderElectionConfig());
+    }
+    if (SparkOperatorConf.JOSDK_METRICS_ENABLED.getValue()) {
+      log.info("Adding Operator JosdkMetrics to metrics system.");
+      OperatorJosdkMetrics operatorJosdkMetrics = new OperatorJosdkMetrics();
+      overrider.withMetrics(operatorJosdkMetrics);
+      metricsSystem.registerSource(operatorJosdkMetrics);
+    }
+  }
+
+  protected void overrideConfigMonitorConfigs(ConfigurationServiceOverrider 
overrider) {
+    overrider.withKubernetesClient(client);
+    overrider.withConcurrentReconciliationThreads(
+        SparkOperatorConf.DYNAMIC_CONFIG_RECONCILER_PARALLELISM.getValue());
+    overrider.withStopOnInformerErrorDuringStartup(true);
+    overrider.withCloseClientOnStop(false);
+    overrider.withInformerStoppedHandler(
+        (informer, ex) ->
+            log.error("Dynamic config informer stopped: operator will not 
accept config updates."));
+  }
+
+  protected void overrideControllerConfigs(ControllerConfigurationOverrider<?> 
overrider) {
+    if (watchedNamespaces.isEmpty()) {
+      log.info("Initializing operator watching at cluster level.");
+    } else {
+      log.info("Initializing with watched namespaces {}", watchedNamespaces);
+    }
+    overrider.settingNamespaces(watchedNamespaces);
+    overrider.withRateLimiter(SparkOperatorConf.getOperatorRateLimiter());
+    overrider.withRetry(SparkOperatorConf.getOperatorRetry());
+  }
+
+  protected List<Interceptor> getClientInterceptors(MetricsSystem 
metricsSystem) {
+    List<Interceptor> clientInterceptors = new ArrayList<>();
+    if (SparkOperatorConf.KUBERNETES_CLIENT_METRICS_ENABLED.getValue()) {
+      KubernetesMetricsInterceptor metricsInterceptor = new 
KubernetesMetricsInterceptor();
+      clientInterceptors.add(metricsInterceptor);
+      metricsSystem.registerSource(metricsInterceptor);
+    }
+    return clientInterceptors;
+  }
+
+  /**
+   * Bootstrap Spark Operator
+   *
+   * @param args not used - operator behavior are configured from 
SparkOperatorConf
+   */
+  public static void main(String[] args) {
+    SparkOperator sparkOperator = new SparkOperator();
+    for (Operator operator : sparkOperator.registeredOperators) {
+      operator.start();
+    }
+    sparkOperator.probeService.start();
+    // Single thread queue to ensure MetricsService starts after the 
MetricsSystem
+    
sparkOperator.metricsResourcesSingleThreadPool.submit(sparkOperator.metricsSystem::start);
+    
sparkOperator.metricsResourcesSingleThreadPool.submit(sparkOperator.metricsService::start);
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
index aae6df9..ffd4bc3 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
@@ -25,10 +25,12 @@ import static 
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_ROLE_DRIVER_VA
 import static 
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_ROLE_EXECUTOR_VALUE;
 import static 
org.apache.spark.k8s.operator.config.SparkOperatorConf.OPERATOR_APP_NAME;
 import static 
org.apache.spark.k8s.operator.config.SparkOperatorConf.OPERATOR_WATCHED_NAMESPACES;
+import static 
org.apache.spark.k8s.operator.config.SparkOperatorConf.SPARK_APP_STATUS_LISTENER_CLASS_NAMES;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -37,6 +39,7 @@ import org.apache.commons.lang3.StringUtils;
 
 import org.apache.spark.k8s.operator.Constants;
 import org.apache.spark.k8s.operator.SparkApplication;
+import org.apache.spark.k8s.operator.listeners.SparkAppStatusListener;
 
 public class Utils {
   public static Set<String> sanitizeCommaSeparatedStrAsSet(String str) {
@@ -102,6 +105,11 @@ public class Utils {
     return 
Utils.sanitizeCommaSeparatedStrAsSet(OPERATOR_WATCHED_NAMESPACES.getValue());
   }
 
+  public static List<SparkAppStatusListener> getAppStatusListener() {
+    return ClassLoadingUtils.getStatusListener(
+        SparkAppStatusListener.class, 
SPARK_APP_STATUS_LISTENER_CLASS_NAMES.getValue());
+  }
+
   /**
    * Labels to be applied to all created resources, as a comma-separated string
    *
diff --git 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/SparkOperatorTest.java
 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/SparkOperatorTest.java
new file mode 100644
index 0000000..7cfc26c
--- /dev/null
+++ 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/SparkOperatorTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import static org.apache.spark.k8s.operator.utils.TestUtils.setConfigKey;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.Operator;
+import io.javaoperatorsdk.operator.RegisteredController;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedConstruction;
+import org.mockito.MockedStatic;
+
+import org.apache.spark.k8s.operator.client.KubernetesClientFactory;
+import org.apache.spark.k8s.operator.config.SparkOperatorConf;
+import org.apache.spark.k8s.operator.config.SparkOperatorConfigMapReconciler;
+import org.apache.spark.k8s.operator.metrics.MetricsService;
+import org.apache.spark.k8s.operator.metrics.MetricsSystem;
+import org.apache.spark.k8s.operator.metrics.MetricsSystemFactory;
+import 
org.apache.spark.k8s.operator.metrics.source.KubernetesMetricsInterceptor;
+import org.apache.spark.k8s.operator.probe.ProbeService;
+import org.apache.spark.k8s.operator.reconciler.SparkAppReconciler;
+import org.apache.spark.k8s.operator.utils.Utils;
+
+class SparkOperatorTest {
+
+  @Test
+  void testOperatorConstructionWithDynamicConfigEnabled() {
+    MetricsSystem mockMetricsSystem = mock(MetricsSystem.class);
+    KubernetesClient mockClient = mock(KubernetesClient.class);
+    boolean dynamicConfigEnabled = 
SparkOperatorConf.DYNAMIC_CONFIG_ENABLED.getValue();
+
+    try (MockedStatic<MetricsSystemFactory> mockMetricsSystemFactory =
+            mockStatic(MetricsSystemFactory.class);
+        MockedStatic<KubernetesClientFactory> mockKubernetesClientFactory =
+            mockStatic(KubernetesClientFactory.class);
+        MockedStatic<Utils> mockUtils = mockStatic(Utils.class);
+        MockedConstruction<Operator> operatorConstruction = 
mockConstruction(Operator.class);
+        MockedConstruction<SparkAppReconciler> sparkAppReconcilerConstruction =
+            mockConstruction(SparkAppReconciler.class);
+        MockedConstruction<SparkOperatorConfigMapReconciler> 
configReconcilerConstruction =
+            mockConstruction(SparkOperatorConfigMapReconciler.class);
+        MockedConstruction<ProbeService> probeServiceConstruction =
+            mockConstruction(ProbeService.class);
+        MockedConstruction<MetricsService> metricsServiceConstruction =
+            mockConstruction(MetricsService.class);
+        MockedConstruction<KubernetesMetricsInterceptor> 
interceptorMockedConstruction =
+            mockConstruction(KubernetesMetricsInterceptor.class)) {
+      setConfigKey(SparkOperatorConf.DYNAMIC_CONFIG_ENABLED, true);
+      mockMetricsSystemFactory
+          .when(MetricsSystemFactory::createMetricsSystem)
+          .thenReturn(mockMetricsSystem);
+      mockKubernetesClientFactory
+          .when(() -> KubernetesClientFactory.buildKubernetesClient(any()))
+          .thenReturn(mockClient);
+      
mockUtils.when(Utils::getWatchedNamespaces).thenReturn(Collections.singleton("namespace-1"));
+
+      SparkOperator sparkOperator = new SparkOperator();
+      Assertions.assertEquals(1, 
sparkOperator.registeredSparkControllers.size());
+      Assertions.assertEquals(2, operatorConstruction.constructed().size());
+      Assertions.assertEquals(1, 
sparkAppReconcilerConstruction.constructed().size());
+      Assertions.assertEquals(1, 
configReconcilerConstruction.constructed().size());
+      Assertions.assertEquals(1, 
probeServiceConstruction.constructed().size());
+      Assertions.assertEquals(1, 
metricsServiceConstruction.constructed().size());
+      Assertions.assertEquals(1, 
interceptorMockedConstruction.constructed().size());
+      
verify(mockMetricsSystem).registerSource(interceptorMockedConstruction.constructed().get(0));
+
+      SparkAppReconciler sparkAppReconciler = 
sparkAppReconcilerConstruction.constructed().get(0);
+      Operator sparkAppOperator = operatorConstruction.constructed().get(0);
+      verify(sparkAppOperator).register(eq(sparkAppReconciler), 
any(Consumer.class));
+    } finally {
+      setConfigKey(SparkOperatorConf.DYNAMIC_CONFIG_ENABLED, 
dynamicConfigEnabled);
+    }
+  }
+
+  @Test
+  void testOperatorConstructionWithDynamicConfigDisabled() {
+    MetricsSystem mockMetricsSystem = mock(MetricsSystem.class);
+    KubernetesClient mockClient = mock(KubernetesClient.class);
+    boolean dynamicConfigEnabled = 
SparkOperatorConf.DYNAMIC_CONFIG_ENABLED.getValue();
+    try (MockedStatic<MetricsSystemFactory> mockMetricsSystemFactory =
+            mockStatic(MetricsSystemFactory.class);
+        MockedStatic<KubernetesClientFactory> mockKubernetesClientFactory =
+            mockStatic(KubernetesClientFactory.class);
+        MockedConstruction<Operator> operatorConstruction = 
mockConstruction(Operator.class);
+        MockedConstruction<SparkAppReconciler> sparkAppReconcilerConstruction =
+            mockConstruction(SparkAppReconciler.class);
+        MockedConstruction<ProbeService> probeServiceConstruction =
+            mockConstruction(ProbeService.class);
+        MockedConstruction<MetricsService> metricsServiceConstruction =
+            mockConstruction(MetricsService.class);
+        MockedConstruction<KubernetesMetricsInterceptor> 
interceptorMockedConstruction =
+            mockConstruction(KubernetesMetricsInterceptor.class)) {
+      setConfigKey(SparkOperatorConf.DYNAMIC_CONFIG_ENABLED, false);
+      mockMetricsSystemFactory
+          .when(MetricsSystemFactory::createMetricsSystem)
+          .thenReturn(mockMetricsSystem);
+      mockKubernetesClientFactory
+          .when(() -> KubernetesClientFactory.buildKubernetesClient(any()))
+          .thenReturn(mockClient);
+      SparkOperator sparkOperator = new SparkOperator();
+      Assertions.assertEquals(1, 
sparkOperator.registeredSparkControllers.size());
+      Assertions.assertEquals(1, operatorConstruction.constructed().size());
+      Assertions.assertEquals(1, 
sparkAppReconcilerConstruction.constructed().size());
+      Assertions.assertEquals(1, 
probeServiceConstruction.constructed().size());
+      Assertions.assertEquals(1, 
metricsServiceConstruction.constructed().size());
+      Assertions.assertEquals(1, 
interceptorMockedConstruction.constructed().size());
+      
verify(mockMetricsSystem).registerSource(interceptorMockedConstruction.constructed().get(0));
+    } finally {
+      setConfigKey(SparkOperatorConf.DYNAMIC_CONFIG_ENABLED, 
dynamicConfigEnabled);
+    }
+  }
+
+  @Test
+  void testUpdateWatchedNamespacesWithDynamicConfigEnabled() {
+    MetricsSystem mockMetricsSystem = mock(MetricsSystem.class);
+    KubernetesClient mockClient = mock(KubernetesClient.class);
+    var registeredController = mock(RegisteredController.class);
+    when(registeredController.allowsNamespaceChanges()).thenReturn(true);
+    boolean dynamicConfigEnabled = 
SparkOperatorConf.DYNAMIC_CONFIG_ENABLED.getValue();
+
+    try (MockedStatic<MetricsSystemFactory> mockMetricsSystemFactory =
+            mockStatic(MetricsSystemFactory.class);
+        MockedStatic<KubernetesClientFactory> mockKubernetesClientFactory =
+            mockStatic(KubernetesClientFactory.class);
+        MockedStatic<Utils> mockUtils = mockStatic(Utils.class);
+        MockedConstruction<Operator> operatorConstruction =
+            mockConstruction(
+                Operator.class,
+                (mock, context) -> {
+                  when(mock.register(any(SparkAppReconciler.class), 
any(Consumer.class)))
+                      .thenReturn(registeredController);
+                });
+        MockedConstruction<SparkAppReconciler> sparkAppReconcilerConstruction =
+            mockConstruction(SparkAppReconciler.class);
+        MockedConstruction<SparkOperatorConfigMapReconciler> 
configReconcilerConstruction =
+            mockConstruction(SparkOperatorConfigMapReconciler.class);
+        MockedConstruction<ProbeService> probeServiceConstruction =
+            mockConstruction(ProbeService.class);
+        MockedConstruction<MetricsService> metricsServiceConstruction =
+            mockConstruction(MetricsService.class);
+        MockedConstruction<KubernetesMetricsInterceptor> 
interceptorMockedConstruction =
+            mockConstruction(KubernetesMetricsInterceptor.class)) {
+      setConfigKey(SparkOperatorConf.DYNAMIC_CONFIG_ENABLED, true);
+      mockMetricsSystemFactory
+          .when(MetricsSystemFactory::createMetricsSystem)
+          .thenReturn(mockMetricsSystem);
+      mockKubernetesClientFactory
+          .when(() -> KubernetesClientFactory.buildKubernetesClient(any()))
+          .thenReturn(mockClient);
+      
mockUtils.when(Utils::getWatchedNamespaces).thenReturn(Collections.singleton("namespace-1"));
+      SparkOperator sparkOperator = new SparkOperator();
+      Set<String> updatedNamespaces = Set.of("namespace-1", "namespace-2");
+      
Assertions.assertTrue(sparkOperator.updateWatchingNamespaces(updatedNamespaces));
+      Assertions.assertEquals(updatedNamespaces, 
sparkOperator.watchedNamespaces);
+      verify(registeredController).allowsNamespaceChanges();
+      verify(registeredController).changeNamespaces(updatedNamespaces);
+      verifyNoMoreInteractions(registeredController);
+    } finally {
+      setConfigKey(SparkOperatorConf.DYNAMIC_CONFIG_ENABLED, 
dynamicConfigEnabled);
+    }
+  }
+}
diff --git 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/utils/TestUtils.java
 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/utils/TestUtils.java
index 59b3989..1603cfa 100644
--- 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/utils/TestUtils.java
+++ 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/utils/TestUtils.java
@@ -26,9 +26,11 @@ import java.io.File;
 import java.util.Map;
 
 import io.fabric8.kubernetes.api.model.ObjectMeta;
+import org.powermock.reflect.Whitebox;
 
 import org.apache.spark.k8s.operator.Constants;
 import org.apache.spark.k8s.operator.SparkApplication;
+import org.apache.spark.k8s.operator.config.ConfigOption;
 
 public class TestUtils {
   public static SparkApplication createMockApp(String namespace) {
@@ -60,4 +62,10 @@ public class TestUtils {
   public static long calculateElapsedTimeInMills(long startTime) {
     return System.currentTimeMillis() - startTime;
   }
+
+  public static <T> T setConfigKey(ConfigOption<T> configKey, T newValue) {
+    T originalPropertyValue = configKey.getValue();
+    Whitebox.setInternalState(configKey, "defaultValue", newValue);
+    return originalPropertyValue;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to