This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 197e094 [SPARK-49052] Add SparkOperator class and tests
197e094 is described below
commit 197e094d9bc516c99d7c9776b3f495927f52e092
Author: zhou-jiang <[email protected]>
AuthorDate: Thu Aug 1 12:38:29 2024 -0700
[SPARK-49052] Add SparkOperator class and tests
### What changes were proposed in this pull request?
This PR adds class `org.apache.spark.k8s.operator.SparkOperator` as the
main entry point of Spark Operator.
### Why are the changes needed?
This bootstraps operator with metrics servers as needed.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass the CIs
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #29 from jiangzho/bootstrap.
Authored-by: zhou-jiang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
gradle.properties | 1 +
spark-operator/build.gradle | 1 +
.../apache/spark/k8s/operator/SparkOperator.java | 223 +++++++++++++++++++++
.../org/apache/spark/k8s/operator/utils/Utils.java | 8 +
.../spark/k8s/operator/SparkOperatorTest.java | 193 ++++++++++++++++++
.../apache/spark/k8s/operator/utils/TestUtils.java | 8 +
6 files changed, 434 insertions(+)
diff --git a/gradle.properties b/gradle.properties
index 8711fc2..2525866 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -38,6 +38,7 @@ log4jVersion=2.22.1
junitVersion=5.10.2
jacocoVersion=0.8.12
mockitoVersion=5.11.0
+powerMockVersion=2.0.9
# Build Analysis
checkstyleVersion=10.17.0
diff --git a/spark-operator/build.gradle b/spark-operator/build.gradle
index 6a733c6..e1b3586 100644
--- a/spark-operator/build.gradle
+++ b/spark-operator/build.gradle
@@ -52,6 +52,7 @@ dependencies {
testImplementation("com.squareup.okhttp3:mockwebserver:$okHttpVersion")
testImplementation platform("org.junit:junit-bom:$junitVersion")
testImplementation("org.junit.jupiter:junit-jupiter:$junitVersion")
+ testImplementation("org.powermock:powermock-core:$powerMockVersion")
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
testImplementation("org.mockito:mockito-core:$mockitoVersion")
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
new file mode 100644
index 0000000..d9d50a1
--- /dev/null
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import static org.apache.spark.k8s.operator.utils.Utils.getAppStatusListener;
+import static org.apache.spark.k8s.operator.utils.Utils.getWatchedNamespaces;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.Operator;
+import io.javaoperatorsdk.operator.RegisteredController;
+import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
+import io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.Interceptor;
+
+import org.apache.spark.k8s.operator.client.KubernetesClientFactory;
+import org.apache.spark.k8s.operator.config.SparkOperatorConf;
+import org.apache.spark.k8s.operator.config.SparkOperatorConfigMapReconciler;
+import org.apache.spark.k8s.operator.metrics.MetricsService;
+import org.apache.spark.k8s.operator.metrics.MetricsSystem;
+import org.apache.spark.k8s.operator.metrics.MetricsSystemFactory;
+import org.apache.spark.k8s.operator.metrics.healthcheck.SentinelManager;
+import
org.apache.spark.k8s.operator.metrics.source.KubernetesMetricsInterceptor;
+import org.apache.spark.k8s.operator.metrics.source.OperatorJosdkMetrics;
+import org.apache.spark.k8s.operator.probe.ProbeService;
+import org.apache.spark.k8s.operator.reconciler.SparkAppReconciler;
+import org.apache.spark.k8s.operator.utils.SparkAppStatusRecorder;
+
+/**
+ * Entry point for Spark Operator. Bootstrap the operator app by starting
watch & reconciler for
+ * SparkApps, starting watch for hot property loading, if enabled, and
starting metrics server with
+ * sentinel monitor if enabled
+ */
+@Slf4j
+public class SparkOperator {
+ private final List<Operator> registeredOperators;
+ private final KubernetesClient client;
+ private final SparkAppSubmissionWorker appSubmissionWorker;
+ private final SparkAppStatusRecorder sparkAppStatusRecorder;
+ protected Set<RegisteredController<?>> registeredSparkControllers;
+ protected Set<String> watchedNamespaces;
+ private final MetricsSystem metricsSystem;
+ private final SentinelManager<SparkApplication>
sparkApplicationSentinelManager;
+ private final ProbeService probeService;
+ private final MetricsService metricsService;
+ private final ExecutorService metricsResourcesSingleThreadPool;
+
+ public SparkOperator() {
+ this.metricsSystem = MetricsSystemFactory.createMetricsSystem();
+ this.client =
+
KubernetesClientFactory.buildKubernetesClient(getClientInterceptors(metricsSystem));
+ this.appSubmissionWorker = new SparkAppSubmissionWorker();
+ this.sparkAppStatusRecorder = new
SparkAppStatusRecorder(getAppStatusListener());
+ this.registeredSparkControllers = new HashSet<>();
+ this.watchedNamespaces = getWatchedNamespaces();
+ this.sparkApplicationSentinelManager = new SentinelManager<>();
+ this.registeredOperators = new ArrayList<>();
+ this.registeredOperators.add(registerSparkOperator());
+ if (SparkOperatorConf.DYNAMIC_CONFIG_ENABLED.getValue()) {
+ this.registeredOperators.add(registerSparkOperatorConfMonitor());
+ }
+ this.metricsResourcesSingleThreadPool =
Executors.newSingleThreadExecutor();
+ this.probeService =
+ new ProbeService(
+ registeredOperators,
Collections.singletonList(sparkApplicationSentinelManager), null);
+ this.metricsService = new MetricsService(metricsSystem,
metricsResourcesSingleThreadPool);
+ }
+
+ protected Operator registerSparkOperator() {
+ Operator op = new Operator(this::overrideOperatorConfigs);
+ registeredSparkControllers.add(
+ op.register(
+ new SparkAppReconciler(
+ appSubmissionWorker, sparkAppStatusRecorder,
sparkApplicationSentinelManager),
+ this::overrideControllerConfigs));
+ return op;
+ }
+
+ protected Operator registerSparkOperatorConfMonitor() {
+ Operator op = new Operator(this::overrideConfigMonitorConfigs);
+ String operatorNamespace = SparkOperatorConf.OPERATOR_NAMESPACE.getValue();
+ String confSelector = SparkOperatorConf.DYNAMIC_CONFIG_SELECTOR.getValue();
+ log.info(
+ "Starting conf monitor in namespace: {}, with selector: {}",
+ operatorNamespace,
+ confSelector);
+ op.register(
+ new SparkOperatorConfigMapReconciler(
+ this::updateWatchingNamespaces,
+ SparkOperatorConf.OPERATOR_NAMESPACE.getValue(),
+ unused -> getWatchedNamespaces()),
+ c -> {
+ c.withRateLimiter(SparkOperatorConf.getOperatorRateLimiter());
+ c.settingNamespaces(operatorNamespace);
+ c.withLabelSelector(confSelector);
+ });
+ return op;
+ }
+
+ protected boolean updateWatchingNamespaces(Set<String> namespaces) {
+ if (watchedNamespaces.equals(namespaces)) {
+ log.info("No watched namespace change detected");
+ return false;
+ }
+ if (watchedNamespaces.isEmpty()) {
+ log.info("Cannot update watch namespaces for operator started at cluster
level.");
+ return false;
+ }
+ if (namespaces == null || namespaces.isEmpty()) {
+ log.error("Cannot updating namespaces to empty");
+ return false;
+ }
+ registeredSparkControllers.forEach(
+ c -> {
+ if (c.allowsNamespaceChanges()) {
+ log.info("Updating operator namespaces to {}", namespaces);
+ c.changeNamespaces(namespaces);
+ } else {
+ log.error("Controller does not allow namespace change, skipping
namespace change.");
+ }
+ });
+ this.watchedNamespaces = new HashSet<>(namespaces);
+ return true;
+ }
+
+ protected void overrideOperatorConfigs(ConfigurationServiceOverrider
overrider) {
+ overrider.withKubernetesClient(client);
+ overrider.withStopOnInformerErrorDuringStartup(
+ SparkOperatorConf.TERMINATE_ON_INFORMER_FAILURE_ENABLED.getValue());
+ overrider.withTerminationTimeoutSeconds(
+ SparkOperatorConf.RECONCILER_TERMINATION_TIMEOUT_SECONDS.getValue());
+ int parallelism = SparkOperatorConf.RECONCILER_PARALLELISM.getValue();
+ if (parallelism > 0) {
+ log.info("Configuring operator with {} reconciliation threads.",
parallelism);
+ overrider.withConcurrentReconciliationThreads(parallelism);
+ } else {
+ log.info("Configuring operator with unbounded reconciliation thread
pool.");
+ overrider.withExecutorService(Executors.newCachedThreadPool());
+ }
+ if (SparkOperatorConf.LEADER_ELECTION_ENABLED.getValue()) {
+
overrider.withLeaderElectionConfiguration(SparkOperatorConf.getLeaderElectionConfig());
+ }
+ if (SparkOperatorConf.JOSDK_METRICS_ENABLED.getValue()) {
+ log.info("Adding Operator JosdkMetrics to metrics system.");
+ OperatorJosdkMetrics operatorJosdkMetrics = new OperatorJosdkMetrics();
+ overrider.withMetrics(operatorJosdkMetrics);
+ metricsSystem.registerSource(operatorJosdkMetrics);
+ }
+ }
+
+ protected void overrideConfigMonitorConfigs(ConfigurationServiceOverrider
overrider) {
+ overrider.withKubernetesClient(client);
+ overrider.withConcurrentReconciliationThreads(
+ SparkOperatorConf.DYNAMIC_CONFIG_RECONCILER_PARALLELISM.getValue());
+ overrider.withStopOnInformerErrorDuringStartup(true);
+ overrider.withCloseClientOnStop(false);
+ overrider.withInformerStoppedHandler(
+ (informer, ex) ->
+ log.error("Dynamic config informer stopped: operator will not
accept config updates."));
+ }
+
+ protected void overrideControllerConfigs(ControllerConfigurationOverrider<?>
overrider) {
+ if (watchedNamespaces.isEmpty()) {
+ log.info("Initializing operator watching at cluster level.");
+ } else {
+ log.info("Initializing with watched namespaces {}", watchedNamespaces);
+ }
+ overrider.settingNamespaces(watchedNamespaces);
+ overrider.withRateLimiter(SparkOperatorConf.getOperatorRateLimiter());
+ overrider.withRetry(SparkOperatorConf.getOperatorRetry());
+ }
+
+ protected List<Interceptor> getClientInterceptors(MetricsSystem
metricsSystem) {
+ List<Interceptor> clientInterceptors = new ArrayList<>();
+ if (SparkOperatorConf.KUBERNETES_CLIENT_METRICS_ENABLED.getValue()) {
+ KubernetesMetricsInterceptor metricsInterceptor = new
KubernetesMetricsInterceptor();
+ clientInterceptors.add(metricsInterceptor);
+ metricsSystem.registerSource(metricsInterceptor);
+ }
+ return clientInterceptors;
+ }
+
+ /**
+ * Bootstrap Spark Operator
+ *
+ * @param args not used - operator behavior are configured from
SparkOperatorConf
+ */
+ public static void main(String[] args) {
+ SparkOperator sparkOperator = new SparkOperator();
+ for (Operator operator : sparkOperator.registeredOperators) {
+ operator.start();
+ }
+ sparkOperator.probeService.start();
+ // Single thread queue to ensure MetricsService starts after the
MetricsSystem
+
sparkOperator.metricsResourcesSingleThreadPool.submit(sparkOperator.metricsSystem::start);
+
sparkOperator.metricsResourcesSingleThreadPool.submit(sparkOperator.metricsService::start);
+ }
+}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
index aae6df9..ffd4bc3 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
@@ -25,10 +25,12 @@ import static
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_ROLE_DRIVER_VA
import static
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_ROLE_EXECUTOR_VALUE;
import static
org.apache.spark.k8s.operator.config.SparkOperatorConf.OPERATOR_APP_NAME;
import static
org.apache.spark.k8s.operator.config.SparkOperatorConf.OPERATOR_WATCHED_NAMESPACES;
+import static
org.apache.spark.k8s.operator.config.SparkOperatorConf.SPARK_APP_STATUS_LISTENER_CLASS_NAMES;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -37,6 +39,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.spark.k8s.operator.Constants;
import org.apache.spark.k8s.operator.SparkApplication;
+import org.apache.spark.k8s.operator.listeners.SparkAppStatusListener;
public class Utils {
public static Set<String> sanitizeCommaSeparatedStrAsSet(String str) {
@@ -102,6 +105,11 @@ public class Utils {
return
Utils.sanitizeCommaSeparatedStrAsSet(OPERATOR_WATCHED_NAMESPACES.getValue());
}
+ public static List<SparkAppStatusListener> getAppStatusListener() {
+ return ClassLoadingUtils.getStatusListener(
+ SparkAppStatusListener.class,
SPARK_APP_STATUS_LISTENER_CLASS_NAMES.getValue());
+ }
+
/**
* Labels to be applied to all created resources, as a comma-separated string
*
diff --git
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/SparkOperatorTest.java
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/SparkOperatorTest.java
new file mode 100644
index 0000000..7cfc26c
--- /dev/null
+++
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/SparkOperatorTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import static org.apache.spark.k8s.operator.utils.TestUtils.setConfigKey;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.Operator;
+import io.javaoperatorsdk.operator.RegisteredController;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedConstruction;
+import org.mockito.MockedStatic;
+
+import org.apache.spark.k8s.operator.client.KubernetesClientFactory;
+import org.apache.spark.k8s.operator.config.SparkOperatorConf;
+import org.apache.spark.k8s.operator.config.SparkOperatorConfigMapReconciler;
+import org.apache.spark.k8s.operator.metrics.MetricsService;
+import org.apache.spark.k8s.operator.metrics.MetricsSystem;
+import org.apache.spark.k8s.operator.metrics.MetricsSystemFactory;
+import
org.apache.spark.k8s.operator.metrics.source.KubernetesMetricsInterceptor;
+import org.apache.spark.k8s.operator.probe.ProbeService;
+import org.apache.spark.k8s.operator.reconciler.SparkAppReconciler;
+import org.apache.spark.k8s.operator.utils.Utils;
+
+class SparkOperatorTest {
+
+ @Test
+ void testOperatorConstructionWithDynamicConfigEnabled() {
+ MetricsSystem mockMetricsSystem = mock(MetricsSystem.class);
+ KubernetesClient mockClient = mock(KubernetesClient.class);
+ boolean dynamicConfigEnabled =
SparkOperatorConf.DYNAMIC_CONFIG_ENABLED.getValue();
+
+ try (MockedStatic<MetricsSystemFactory> mockMetricsSystemFactory =
+ mockStatic(MetricsSystemFactory.class);
+ MockedStatic<KubernetesClientFactory> mockKubernetesClientFactory =
+ mockStatic(KubernetesClientFactory.class);
+ MockedStatic<Utils> mockUtils = mockStatic(Utils.class);
+ MockedConstruction<Operator> operatorConstruction =
mockConstruction(Operator.class);
+ MockedConstruction<SparkAppReconciler> sparkAppReconcilerConstruction =
+ mockConstruction(SparkAppReconciler.class);
+ MockedConstruction<SparkOperatorConfigMapReconciler>
configReconcilerConstruction =
+ mockConstruction(SparkOperatorConfigMapReconciler.class);
+ MockedConstruction<ProbeService> probeServiceConstruction =
+ mockConstruction(ProbeService.class);
+ MockedConstruction<MetricsService> metricsServiceConstruction =
+ mockConstruction(MetricsService.class);
+ MockedConstruction<KubernetesMetricsInterceptor>
interceptorMockedConstruction =
+ mockConstruction(KubernetesMetricsInterceptor.class)) {
+ setConfigKey(SparkOperatorConf.DYNAMIC_CONFIG_ENABLED, true);
+ mockMetricsSystemFactory
+ .when(MetricsSystemFactory::createMetricsSystem)
+ .thenReturn(mockMetricsSystem);
+ mockKubernetesClientFactory
+ .when(() -> KubernetesClientFactory.buildKubernetesClient(any()))
+ .thenReturn(mockClient);
+
mockUtils.when(Utils::getWatchedNamespaces).thenReturn(Collections.singleton("namespace-1"));
+
+ SparkOperator sparkOperator = new SparkOperator();
+ Assertions.assertEquals(1,
sparkOperator.registeredSparkControllers.size());
+ Assertions.assertEquals(2, operatorConstruction.constructed().size());
+ Assertions.assertEquals(1,
sparkAppReconcilerConstruction.constructed().size());
+ Assertions.assertEquals(1,
configReconcilerConstruction.constructed().size());
+ Assertions.assertEquals(1,
probeServiceConstruction.constructed().size());
+ Assertions.assertEquals(1,
metricsServiceConstruction.constructed().size());
+ Assertions.assertEquals(1,
interceptorMockedConstruction.constructed().size());
+
verify(mockMetricsSystem).registerSource(interceptorMockedConstruction.constructed().get(0));
+
+ SparkAppReconciler sparkAppReconciler =
sparkAppReconcilerConstruction.constructed().get(0);
+ Operator sparkAppOperator = operatorConstruction.constructed().get(0);
+ verify(sparkAppOperator).register(eq(sparkAppReconciler),
any(Consumer.class));
+ } finally {
+ setConfigKey(SparkOperatorConf.DYNAMIC_CONFIG_ENABLED,
dynamicConfigEnabled);
+ }
+ }
+
+ @Test
+ void testOperatorConstructionWithDynamicConfigDisabled() {
+ MetricsSystem mockMetricsSystem = mock(MetricsSystem.class);
+ KubernetesClient mockClient = mock(KubernetesClient.class);
+ boolean dynamicConfigEnabled =
SparkOperatorConf.DYNAMIC_CONFIG_ENABLED.getValue();
+ try (MockedStatic<MetricsSystemFactory> mockMetricsSystemFactory =
+ mockStatic(MetricsSystemFactory.class);
+ MockedStatic<KubernetesClientFactory> mockKubernetesClientFactory =
+ mockStatic(KubernetesClientFactory.class);
+ MockedConstruction<Operator> operatorConstruction =
mockConstruction(Operator.class);
+ MockedConstruction<SparkAppReconciler> sparkAppReconcilerConstruction =
+ mockConstruction(SparkAppReconciler.class);
+ MockedConstruction<ProbeService> probeServiceConstruction =
+ mockConstruction(ProbeService.class);
+ MockedConstruction<MetricsService> metricsServiceConstruction =
+ mockConstruction(MetricsService.class);
+ MockedConstruction<KubernetesMetricsInterceptor>
interceptorMockedConstruction =
+ mockConstruction(KubernetesMetricsInterceptor.class)) {
+ setConfigKey(SparkOperatorConf.DYNAMIC_CONFIG_ENABLED, false);
+ mockMetricsSystemFactory
+ .when(MetricsSystemFactory::createMetricsSystem)
+ .thenReturn(mockMetricsSystem);
+ mockKubernetesClientFactory
+ .when(() -> KubernetesClientFactory.buildKubernetesClient(any()))
+ .thenReturn(mockClient);
+ SparkOperator sparkOperator = new SparkOperator();
+ Assertions.assertEquals(1,
sparkOperator.registeredSparkControllers.size());
+ Assertions.assertEquals(1, operatorConstruction.constructed().size());
+ Assertions.assertEquals(1,
sparkAppReconcilerConstruction.constructed().size());
+ Assertions.assertEquals(1,
probeServiceConstruction.constructed().size());
+ Assertions.assertEquals(1,
metricsServiceConstruction.constructed().size());
+ Assertions.assertEquals(1,
interceptorMockedConstruction.constructed().size());
+
verify(mockMetricsSystem).registerSource(interceptorMockedConstruction.constructed().get(0));
+ } finally {
+ setConfigKey(SparkOperatorConf.DYNAMIC_CONFIG_ENABLED,
dynamicConfigEnabled);
+ }
+ }
+
+ @Test
+ void testUpdateWatchedNamespacesWithDynamicConfigEnabled() {
+ MetricsSystem mockMetricsSystem = mock(MetricsSystem.class);
+ KubernetesClient mockClient = mock(KubernetesClient.class);
+ var registeredController = mock(RegisteredController.class);
+ when(registeredController.allowsNamespaceChanges()).thenReturn(true);
+ boolean dynamicConfigEnabled =
SparkOperatorConf.DYNAMIC_CONFIG_ENABLED.getValue();
+
+ try (MockedStatic<MetricsSystemFactory> mockMetricsSystemFactory =
+ mockStatic(MetricsSystemFactory.class);
+ MockedStatic<KubernetesClientFactory> mockKubernetesClientFactory =
+ mockStatic(KubernetesClientFactory.class);
+ MockedStatic<Utils> mockUtils = mockStatic(Utils.class);
+ MockedConstruction<Operator> operatorConstruction =
+ mockConstruction(
+ Operator.class,
+ (mock, context) -> {
+ when(mock.register(any(SparkAppReconciler.class),
any(Consumer.class)))
+ .thenReturn(registeredController);
+ });
+ MockedConstruction<SparkAppReconciler> sparkAppReconcilerConstruction =
+ mockConstruction(SparkAppReconciler.class);
+ MockedConstruction<SparkOperatorConfigMapReconciler>
configReconcilerConstruction =
+ mockConstruction(SparkOperatorConfigMapReconciler.class);
+ MockedConstruction<ProbeService> probeServiceConstruction =
+ mockConstruction(ProbeService.class);
+ MockedConstruction<MetricsService> metricsServiceConstruction =
+ mockConstruction(MetricsService.class);
+ MockedConstruction<KubernetesMetricsInterceptor>
interceptorMockedConstruction =
+ mockConstruction(KubernetesMetricsInterceptor.class)) {
+ setConfigKey(SparkOperatorConf.DYNAMIC_CONFIG_ENABLED, true);
+ mockMetricsSystemFactory
+ .when(MetricsSystemFactory::createMetricsSystem)
+ .thenReturn(mockMetricsSystem);
+ mockKubernetesClientFactory
+ .when(() -> KubernetesClientFactory.buildKubernetesClient(any()))
+ .thenReturn(mockClient);
+
mockUtils.when(Utils::getWatchedNamespaces).thenReturn(Collections.singleton("namespace-1"));
+ SparkOperator sparkOperator = new SparkOperator();
+ Set<String> updatedNamespaces = Set.of("namespace-1", "namespace-2");
+
Assertions.assertTrue(sparkOperator.updateWatchingNamespaces(updatedNamespaces));
+ Assertions.assertEquals(updatedNamespaces,
sparkOperator.watchedNamespaces);
+ verify(registeredController).allowsNamespaceChanges();
+ verify(registeredController).changeNamespaces(updatedNamespaces);
+ verifyNoMoreInteractions(registeredController);
+ } finally {
+ setConfigKey(SparkOperatorConf.DYNAMIC_CONFIG_ENABLED,
dynamicConfigEnabled);
+ }
+ }
+}
diff --git
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/utils/TestUtils.java
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/utils/TestUtils.java
index 59b3989..1603cfa 100644
---
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/utils/TestUtils.java
+++
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/utils/TestUtils.java
@@ -26,9 +26,11 @@ import java.io.File;
import java.util.Map;
import io.fabric8.kubernetes.api.model.ObjectMeta;
+import org.powermock.reflect.Whitebox;
import org.apache.spark.k8s.operator.Constants;
import org.apache.spark.k8s.operator.SparkApplication;
+import org.apache.spark.k8s.operator.config.ConfigOption;
public class TestUtils {
public static SparkApplication createMockApp(String namespace) {
@@ -60,4 +62,10 @@ public class TestUtils {
public static long calculateElapsedTimeInMills(long startTime) {
return System.currentTimeMillis() - startTime;
}
+
+ public static <T> T setConfigKey(ConfigOption<T> configKey, T newValue) {
+ T originalPropertyValue = configKey.getValue();
+ Whitebox.setInternalState(configKey, "defaultValue", newValue);
+ return originalPropertyValue;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]