This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 186746a [SPARK-52086] Upgrade `operator-sdk` to 5.1.1 186746a is described below commit 186746af908092e7ff293e6c00605e9ec1fb59eb Author: Attila Mészáros <a_mesza...@apple.com> AuthorDate: Sat Jun 28 11:07:15 2025 -0700 [SPARK-52086] Upgrade `operator-sdk` to 5.1.1 ### What changes were proposed in this pull request? Change Java Operator SDK version to v5.1 The intention is to keep the functionality the same as the previous version; - https://mvnrepository.com/artifact/io.javaoperatorsdk/java-operator-sdk/5.1.1 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Standard CI should test it. ### Was this patch authored or co-authored using generative AI tooling? No Closes #252 from csviri/josdk-5_1. Authored-by: Attila Mészáros <a_mesza...@apple.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- gradle/libs.versions.toml | 2 +- .../apache/spark/k8s/operator/SparkOperator.java | 7 +++++-- .../config/SparkOperatorConfigMapReconciler.java | 22 ++++++++------------ .../operator/reconciler/SparkAppReconciler.java | 22 ++++++++------------ .../reconciler/SparkClusterReconciler.java | 24 ++++++++-------------- .../org/apache/spark/k8s/operator/utils/Utils.java | 24 ++++++++++++++++++++++ .../spark/k8s/operator/probe/HealthProbeTest.java | 13 +----------- 7 files changed, 57 insertions(+), 57 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 507399d..4be9c35 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -17,7 +17,7 @@ [versions] fabric8 = "7.3.1" lombok = "1.18.38" -operator-sdk = "4.9.0" +operator-sdk = "5.1.1" okhttp = "4.12.0" dropwizard-metrics = "4.2.30" spark = "4.0.0" diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java index 00619d7..d0f750a 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java @@ -23,6 +23,7 @@ import static org.apache.spark.k8s.operator.utils.Utils.getAppStatusListener; import static org.apache.spark.k8s.operator.utils.Utils.getClusterStatusListener; import static org.apache.spark.k8s.operator.utils.Utils.getWatchedNamespaces; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -168,8 +169,8 @@ public class SparkOperator { overrider.withKubernetesClient(client); overrider.withStopOnInformerErrorDuringStartup( SparkOperatorConf.TERMINATE_ON_INFORMER_FAILURE_ENABLED.getValue()); - overrider.withTerminationTimeoutSeconds( - SparkOperatorConf.RECONCILER_TERMINATION_TIMEOUT_SECONDS.getValue()); + overrider.withReconciliationTerminationTimeout( + Duration.ofSeconds(SparkOperatorConf.RECONCILER_TERMINATION_TIMEOUT_SECONDS.getValue())); int parallelism = SparkOperatorConf.RECONCILER_PARALLELISM.getValue(); if (parallelism > 0) { log.info("Configuring operator with {} reconciliation threads.", parallelism); @@ -187,6 +188,7 @@ public class SparkOperator { overrider.withMetrics(operatorJosdkMetrics); metricsSystem.registerSource(operatorJosdkMetrics); } + overrider.withUseSSAToPatchPrimaryResource(false); } protected void overrideConfigMonitorConfigs(ConfigurationServiceOverrider overrider) { @@ -198,6 +200,7 @@ public class SparkOperator { overrider.withInformerStoppedHandler( (informer, ex) -> log.error("Dynamic config informer stopped: operator will not accept config updates.")); + overrider.withUseSSAToPatchPrimaryResource(false); } protected void overrideControllerConfigs(ControllerConfigurationOverrider<?> overrider) { diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java index 182f423..491057d 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java @@ -19,18 +19,16 @@ package org.apache.spark.k8s.operator.config; -import java.util.Map; +import java.util.List; import java.util.Set; import java.util.function.Function; import io.fabric8.kubernetes.api.model.ConfigMap; -import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; +import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.source.EventSource; @@ -46,10 +44,7 @@ import lombok.extern.slf4j.Slf4j; @ControllerConfiguration @RequiredArgsConstructor @Slf4j -public class SparkOperatorConfigMapReconciler - implements Reconciler<ConfigMap>, - ErrorStatusHandler<ConfigMap>, - EventSourceInitializer<ConfigMap> { +public class SparkOperatorConfigMapReconciler implements Reconciler<ConfigMap> { private final Function<Set<String>, Boolean> namespaceUpdater; private final String operatorNamespace; private final Function<Void, Set<String>> watchedNamespacesGetter; @@ -62,14 +57,15 @@ public class SparkOperatorConfigMapReconciler } @Override - public Map<String, EventSource> prepareEventSources(EventSourceContext<ConfigMap> context) { - EventSource configMapEventSource = + public List<EventSource<?, ConfigMap>> prepareEventSources( + EventSourceContext<ConfigMap> context) { + var configMapEventSource = new InformerEventSource<>( - InformerConfiguration.from(ConfigMap.class, context) - .withNamespaces(operatorNamespace) + InformerEventSourceConfiguration.from(ConfigMap.class, ConfigMap.class) + .withNamespaces(Set.of(operatorNamespace)) .build(), context); - return EventSourceInitializer.nameEventSources(configMapEventSource); + return List.of(configMapEventSource); } @Override diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java index 89c5213..9b1323b 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java @@ -21,28 +21,25 @@ package org.apache.spark.k8s.operator.reconciler; import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_APPLICATION_NAME; import static org.apache.spark.k8s.operator.reconciler.ReconcileProgress.completeAndDefaultRequeue; +import static org.apache.spark.k8s.operator.utils.Utils.basicLabelSecondaryToPrimaryMapper; import static org.apache.spark.k8s.operator.utils.Utils.commonResourceLabelsStr; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import io.fabric8.kubernetes.api.model.Pod; -import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; +import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; import io.javaoperatorsdk.operator.api.reconciler.Cleaner; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.source.EventSource; import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; -import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -74,11 +71,7 @@ import org.apache.spark.k8s.operator.utils.SparkAppStatusUtils; @ControllerConfiguration @Slf4j @RequiredArgsConstructor -public class SparkAppReconciler - implements Reconciler<SparkApplication>, - ErrorStatusHandler<SparkApplication>, - EventSourceInitializer<SparkApplication>, - Cleaner<SparkApplication> { +public class SparkAppReconciler implements Reconciler<SparkApplication>, Cleaner<SparkApplication> { private final SparkAppSubmissionWorker submissionWorker; private final SparkAppStatusRecorder sparkAppStatusRecorder; private final SentinelManager<SparkApplication> sentinelManager; @@ -135,16 +128,17 @@ public class SparkAppReconciler } @Override - public Map<String, EventSource> prepareEventSources( + public List<EventSource<?, SparkApplication>> prepareEventSources( EventSourceContext<SparkApplication> context) { EventSource podEventSource = new InformerEventSource<>( - InformerConfiguration.from(Pod.class, context) - .withSecondaryToPrimaryMapper(Mappers.fromLabel(LABEL_SPARK_APPLICATION_NAME)) + InformerEventSourceConfiguration.from(Pod.class, SparkApplication.class) + .withSecondaryToPrimaryMapper( + basicLabelSecondaryToPrimaryMapper(LABEL_SPARK_APPLICATION_NAME)) .withLabelSelector(commonResourceLabelsStr()) .build(), context); - return EventSourceInitializer.nameEventSources(podEventSource); + return List.of(podEventSource); } protected List<AppReconcileStep> getReconcileSteps(final SparkApplication app) { diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterReconciler.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterReconciler.java index fde1a98..f99d558 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterReconciler.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterReconciler.java @@ -19,32 +19,29 @@ package org.apache.spark.k8s.operator.reconciler; +import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_APPLICATION_NAME; import static org.apache.spark.k8s.operator.reconciler.ReconcileProgress.completeAndDefaultRequeue; +import static org.apache.spark.k8s.operator.utils.Utils.basicLabelSecondaryToPrimaryMapper; import static org.apache.spark.k8s.operator.utils.Utils.commonResourceLabelsStr; import java.util.ArrayList; import java.util.List; -import java.util.Map; import io.fabric8.kubernetes.api.model.Pod; -import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; +import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; import io.javaoperatorsdk.operator.api.reconciler.Cleaner; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.source.EventSource; import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; -import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.spark.k8s.operator.Constants; import org.apache.spark.k8s.operator.SparkCluster; import org.apache.spark.k8s.operator.SparkClusterSubmissionWorker; import org.apache.spark.k8s.operator.context.SparkClusterContext; @@ -61,11 +58,7 @@ import org.apache.spark.k8s.operator.utils.SparkClusterStatusRecorder; @ControllerConfiguration @Slf4j @RequiredArgsConstructor -public class SparkClusterReconciler - implements Reconciler<SparkCluster>, - ErrorStatusHandler<SparkCluster>, - EventSourceInitializer<SparkCluster>, - Cleaner<SparkCluster> { +public class SparkClusterReconciler implements Reconciler<SparkCluster>, Cleaner<SparkCluster> { private final SparkClusterSubmissionWorker submissionWorker; private final SparkClusterStatusRecorder sparkClusterStatusRecorder; private final SentinelManager<SparkCluster> sentinelManager; @@ -118,16 +111,17 @@ public class SparkClusterReconciler } @Override - public Map<String, EventSource> prepareEventSources(EventSourceContext<SparkCluster> context) { + public List<EventSource<?, SparkCluster>> prepareEventSources( + EventSourceContext<SparkCluster> context) { EventSource podEventSource = new InformerEventSource<>( - InformerConfiguration.from(Pod.class, context) + InformerEventSourceConfiguration.from(Pod.class, SparkCluster.class) .withSecondaryToPrimaryMapper( - Mappers.fromLabel(Constants.LABEL_SPARK_APPLICATION_NAME)) + basicLabelSecondaryToPrimaryMapper(LABEL_SPARK_APPLICATION_NAME)) .withLabelSelector(commonResourceLabelsStr()) .build(), context); - return EventSourceInitializer.nameEventSources(podEventSource); + return List.of(podEventSource); } protected List<ClusterReconcileStep> getReconcileSteps(final SparkCluster cluster) { diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java index 28c1cf6..7ab3574 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java @@ -38,6 +38,9 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; import org.apache.commons.lang3.StringUtils; import org.apache.spark.k8s.operator.Constants; @@ -144,4 +147,25 @@ public final class Utils { public static String commonResourceLabelsStr() { return labelsAsStr(commonManagedResourceLabels()); } + + public static <T extends HasMetadata> + SecondaryToPrimaryMapper<T> basicLabelSecondaryToPrimaryMapper(String nameKey) { + return resource -> { + final var metadata = resource.getMetadata(); + if (metadata == null) { + return Collections.emptySet(); + } else { + final var map = metadata.getLabels(); + if (map == null) { + return Collections.emptySet(); + } + var name = map.get(nameKey); + if (name == null) { + return Collections.emptySet(); + } + var namespace = resource.getMetadata().getNamespace(); + return Set.of(new ResourceID(name, namespace)); + } + }; + } } diff --git a/spark-operator/src/test/java/org/apache/spark/k8s/operator/probe/HealthProbeTest.java b/spark-operator/src/test/java/org/apache/spark/k8s/operator/probe/HealthProbeTest.java index a8b4587..958a8d2 100644 --- a/spark-operator/src/test/java/org/apache/spark/k8s/operator/probe/HealthProbeTest.java +++ b/spark-operator/src/test/java/org/apache/spark/k8s/operator/probe/HealthProbeTest.java @@ -36,7 +36,6 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.javaoperatorsdk.operator.Operator; import io.javaoperatorsdk.operator.RuntimeInfo; -import io.javaoperatorsdk.operator.api.config.ResourceConfiguration; import io.javaoperatorsdk.operator.health.InformerHealthIndicator; import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator; import io.javaoperatorsdk.operator.health.Status; @@ -190,16 +189,6 @@ class HealthProbeTest { } })); - return new InformerWrappingEventSourceHealthIndicator() { - @Override - public Map<String, InformerHealthIndicator> informerHealthIndicators() { - return informers; - } - - @Override - public ResourceConfiguration getInformerConfiguration() { - return null; - } - }; + return () -> informers; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org