This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 186746a  [SPARK-52086] Upgrade `operator-sdk` to 5.1.1
186746a is described below

commit 186746af908092e7ff293e6c00605e9ec1fb59eb
Author: Attila Mészáros <a_mesza...@apple.com>
AuthorDate: Sat Jun 28 11:07:15 2025 -0700

    [SPARK-52086] Upgrade `operator-sdk` to 5.1.1
    
    ### What changes were proposed in this pull request?
    
    Change Java Operator SDK version to v5.1
    
    The intention is to keep the functionality the same as the previous version;
    
    - 
https://mvnrepository.com/artifact/io.javaoperatorsdk/java-operator-sdk/5.1.1
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Standard CI should test it.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #252 from csviri/josdk-5_1.
    
    Authored-by: Attila Mészáros <a_mesza...@apple.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 gradle/libs.versions.toml                          |  2 +-
 .../apache/spark/k8s/operator/SparkOperator.java   |  7 +++++--
 .../config/SparkOperatorConfigMapReconciler.java   | 22 ++++++++------------
 .../operator/reconciler/SparkAppReconciler.java    | 22 ++++++++------------
 .../reconciler/SparkClusterReconciler.java         | 24 ++++++++--------------
 .../org/apache/spark/k8s/operator/utils/Utils.java | 24 ++++++++++++++++++++++
 .../spark/k8s/operator/probe/HealthProbeTest.java  | 13 +-----------
 7 files changed, 57 insertions(+), 57 deletions(-)

diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 507399d..4be9c35 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -17,7 +17,7 @@
 [versions]
 fabric8 = "7.3.1"
 lombok = "1.18.38"
-operator-sdk = "4.9.0"
+operator-sdk = "5.1.1"
 okhttp = "4.12.0"
 dropwizard-metrics = "4.2.30"
 spark = "4.0.0"
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
index 00619d7..d0f750a 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
@@ -23,6 +23,7 @@ import static 
org.apache.spark.k8s.operator.utils.Utils.getAppStatusListener;
 import static 
org.apache.spark.k8s.operator.utils.Utils.getClusterStatusListener;
 import static org.apache.spark.k8s.operator.utils.Utils.getWatchedNamespaces;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -168,8 +169,8 @@ public class SparkOperator {
     overrider.withKubernetesClient(client);
     overrider.withStopOnInformerErrorDuringStartup(
         SparkOperatorConf.TERMINATE_ON_INFORMER_FAILURE_ENABLED.getValue());
-    overrider.withTerminationTimeoutSeconds(
-        SparkOperatorConf.RECONCILER_TERMINATION_TIMEOUT_SECONDS.getValue());
+    overrider.withReconciliationTerminationTimeout(
+        
Duration.ofSeconds(SparkOperatorConf.RECONCILER_TERMINATION_TIMEOUT_SECONDS.getValue()));
     int parallelism = SparkOperatorConf.RECONCILER_PARALLELISM.getValue();
     if (parallelism > 0) {
       log.info("Configuring operator with {} reconciliation threads.", 
parallelism);
@@ -187,6 +188,7 @@ public class SparkOperator {
       overrider.withMetrics(operatorJosdkMetrics);
       metricsSystem.registerSource(operatorJosdkMetrics);
     }
+    overrider.withUseSSAToPatchPrimaryResource(false);
   }
 
   protected void overrideConfigMonitorConfigs(ConfigurationServiceOverrider 
overrider) {
@@ -198,6 +200,7 @@ public class SparkOperator {
     overrider.withInformerStoppedHandler(
         (informer, ex) ->
             log.error("Dynamic config informer stopped: operator will not 
accept config updates."));
+    overrider.withUseSSAToPatchPrimaryResource(false);
   }
 
   protected void overrideControllerConfigs(ControllerConfigurationOverrider<?> 
overrider) {
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java
index 182f423..491057d 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConfigMapReconciler.java
@@ -19,18 +19,16 @@
 
 package org.apache.spark.k8s.operator.config;
 
-import java.util.Map;
+import java.util.List;
 import java.util.Set;
 import java.util.function.Function;
 
 import io.fabric8.kubernetes.api.model.ConfigMap;
-import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
+import 
io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
-import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
 import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
 import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
-import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
 import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import io.javaoperatorsdk.operator.processing.event.source.EventSource;
@@ -46,10 +44,7 @@ import lombok.extern.slf4j.Slf4j;
 @ControllerConfiguration
 @RequiredArgsConstructor
 @Slf4j
-public class SparkOperatorConfigMapReconciler
-    implements Reconciler<ConfigMap>,
-        ErrorStatusHandler<ConfigMap>,
-        EventSourceInitializer<ConfigMap> {
+public class SparkOperatorConfigMapReconciler implements Reconciler<ConfigMap> 
{
   private final Function<Set<String>, Boolean> namespaceUpdater;
   private final String operatorNamespace;
   private final Function<Void, Set<String>> watchedNamespacesGetter;
@@ -62,14 +57,15 @@ public class SparkOperatorConfigMapReconciler
   }
 
   @Override
-  public Map<String, EventSource> 
prepareEventSources(EventSourceContext<ConfigMap> context) {
-    EventSource configMapEventSource =
+  public List<EventSource<?, ConfigMap>> prepareEventSources(
+      EventSourceContext<ConfigMap> context) {
+    var configMapEventSource =
         new InformerEventSource<>(
-            InformerConfiguration.from(ConfigMap.class, context)
-                .withNamespaces(operatorNamespace)
+            InformerEventSourceConfiguration.from(ConfigMap.class, 
ConfigMap.class)
+                .withNamespaces(Set.of(operatorNamespace))
                 .build(),
             context);
-    return EventSourceInitializer.nameEventSources(configMapEventSource);
+    return List.of(configMapEventSource);
   }
 
   @Override
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java
index 89c5213..9b1323b 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkAppReconciler.java
@@ -21,28 +21,25 @@ package org.apache.spark.k8s.operator.reconciler;
 
 import static 
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_APPLICATION_NAME;
 import static 
org.apache.spark.k8s.operator.reconciler.ReconcileProgress.completeAndDefaultRequeue;
+import static 
org.apache.spark.k8s.operator.utils.Utils.basicLabelSecondaryToPrimaryMapper;
 import static 
org.apache.spark.k8s.operator.utils.Utils.commonResourceLabelsStr;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
 import io.fabric8.kubernetes.api.model.Pod;
-import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
+import 
io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
 import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
-import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
 import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
 import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
-import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
 import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import io.javaoperatorsdk.operator.processing.event.source.EventSource;
 import 
io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
-import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
@@ -74,11 +71,7 @@ import 
org.apache.spark.k8s.operator.utils.SparkAppStatusUtils;
 @ControllerConfiguration
 @Slf4j
 @RequiredArgsConstructor
-public class SparkAppReconciler
-    implements Reconciler<SparkApplication>,
-        ErrorStatusHandler<SparkApplication>,
-        EventSourceInitializer<SparkApplication>,
-        Cleaner<SparkApplication> {
+public class SparkAppReconciler implements Reconciler<SparkApplication>, 
Cleaner<SparkApplication> {
   private final SparkAppSubmissionWorker submissionWorker;
   private final SparkAppStatusRecorder sparkAppStatusRecorder;
   private final SentinelManager<SparkApplication> sentinelManager;
@@ -135,16 +128,17 @@ public class SparkAppReconciler
   }
 
   @Override
-  public Map<String, EventSource> prepareEventSources(
+  public List<EventSource<?, SparkApplication>> prepareEventSources(
       EventSourceContext<SparkApplication> context) {
     EventSource podEventSource =
         new InformerEventSource<>(
-            InformerConfiguration.from(Pod.class, context)
-                
.withSecondaryToPrimaryMapper(Mappers.fromLabel(LABEL_SPARK_APPLICATION_NAME))
+            InformerEventSourceConfiguration.from(Pod.class, 
SparkApplication.class)
+                .withSecondaryToPrimaryMapper(
+                    
basicLabelSecondaryToPrimaryMapper(LABEL_SPARK_APPLICATION_NAME))
                 .withLabelSelector(commonResourceLabelsStr())
                 .build(),
             context);
-    return EventSourceInitializer.nameEventSources(podEventSource);
+    return List.of(podEventSource);
   }
 
   protected List<AppReconcileStep> getReconcileSteps(final SparkApplication 
app) {
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterReconciler.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterReconciler.java
index fde1a98..f99d558 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterReconciler.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterReconciler.java
@@ -19,32 +19,29 @@
 
 package org.apache.spark.k8s.operator.reconciler;
 
+import static 
org.apache.spark.k8s.operator.Constants.LABEL_SPARK_APPLICATION_NAME;
 import static 
org.apache.spark.k8s.operator.reconciler.ReconcileProgress.completeAndDefaultRequeue;
+import static 
org.apache.spark.k8s.operator.utils.Utils.basicLabelSecondaryToPrimaryMapper;
 import static 
org.apache.spark.k8s.operator.utils.Utils.commonResourceLabelsStr;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 import io.fabric8.kubernetes.api.model.Pod;
-import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
+import 
io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
 import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
-import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
 import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
 import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
-import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
 import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import io.javaoperatorsdk.operator.processing.event.source.EventSource;
 import 
io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
-import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.spark.k8s.operator.Constants;
 import org.apache.spark.k8s.operator.SparkCluster;
 import org.apache.spark.k8s.operator.SparkClusterSubmissionWorker;
 import org.apache.spark.k8s.operator.context.SparkClusterContext;
@@ -61,11 +58,7 @@ import 
org.apache.spark.k8s.operator.utils.SparkClusterStatusRecorder;
 @ControllerConfiguration
 @Slf4j
 @RequiredArgsConstructor
-public class SparkClusterReconciler
-    implements Reconciler<SparkCluster>,
-        ErrorStatusHandler<SparkCluster>,
-        EventSourceInitializer<SparkCluster>,
-        Cleaner<SparkCluster> {
+public class SparkClusterReconciler implements Reconciler<SparkCluster>, 
Cleaner<SparkCluster> {
   private final SparkClusterSubmissionWorker submissionWorker;
   private final SparkClusterStatusRecorder sparkClusterStatusRecorder;
   private final SentinelManager<SparkCluster> sentinelManager;
@@ -118,16 +111,17 @@ public class SparkClusterReconciler
   }
 
   @Override
-  public Map<String, EventSource> 
prepareEventSources(EventSourceContext<SparkCluster> context) {
+  public List<EventSource<?, SparkCluster>> prepareEventSources(
+      EventSourceContext<SparkCluster> context) {
     EventSource podEventSource =
         new InformerEventSource<>(
-            InformerConfiguration.from(Pod.class, context)
+            InformerEventSourceConfiguration.from(Pod.class, 
SparkCluster.class)
                 .withSecondaryToPrimaryMapper(
-                    Mappers.fromLabel(Constants.LABEL_SPARK_APPLICATION_NAME))
+                    
basicLabelSecondaryToPrimaryMapper(LABEL_SPARK_APPLICATION_NAME))
                 .withLabelSelector(commonResourceLabelsStr())
                 .build(),
             context);
-    return EventSourceInitializer.nameEventSources(podEventSource);
+    return List.of(podEventSource);
   }
 
   protected List<ClusterReconcileStep> getReconcileSteps(final SparkCluster 
cluster) {
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
index 28c1cf6..7ab3574 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/Utils.java
@@ -38,6 +38,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import 
io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.spark.k8s.operator.Constants;
@@ -144,4 +147,25 @@ public final class Utils {
   public static String commonResourceLabelsStr() {
     return labelsAsStr(commonManagedResourceLabels());
   }
+
+  public static <T extends HasMetadata>
+      SecondaryToPrimaryMapper<T> basicLabelSecondaryToPrimaryMapper(String 
nameKey) {
+    return resource -> {
+      final var metadata = resource.getMetadata();
+      if (metadata == null) {
+        return Collections.emptySet();
+      } else {
+        final var map = metadata.getLabels();
+        if (map == null) {
+          return Collections.emptySet();
+        }
+        var name = map.get(nameKey);
+        if (name == null) {
+          return Collections.emptySet();
+        }
+        var namespace = resource.getMetadata().getNamespace();
+        return Set.of(new ResourceID(name, namespace));
+      }
+    };
+  }
 }
diff --git 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/probe/HealthProbeTest.java
 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/probe/HealthProbeTest.java
index a8b4587..958a8d2 100644
--- 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/probe/HealthProbeTest.java
+++ 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/probe/HealthProbeTest.java
@@ -36,7 +36,6 @@ import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.javaoperatorsdk.operator.Operator;
 import io.javaoperatorsdk.operator.RuntimeInfo;
-import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
 import io.javaoperatorsdk.operator.health.InformerHealthIndicator;
 import 
io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator;
 import io.javaoperatorsdk.operator.health.Status;
@@ -190,16 +189,6 @@ class HealthProbeTest {
                   }
                 }));
 
-    return new InformerWrappingEventSourceHealthIndicator() {
-      @Override
-      public Map<String, InformerHealthIndicator> informerHealthIndicators() {
-        return informers;
-      }
-
-      @Override
-      public ResourceConfiguration getInformerConfiguration() {
-        return null;
-      }
-    };
+    return () -> informers;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to