This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 8fb828803d0a30acc8e13f3c517c0bf8aecda337
Author: Dennis-Mircea Ciupitu <[email protected]>
AuthorDate: Mon May 25 16:42:54 2026 +0300

    [hotfix] Clean up FlinkBlueGreenDeploymentController
---
 .../FlinkBlueGreenDeploymentController.java        | 67 +++++++++-------------
 .../controller/FlinkBlueGreenDeployments.java      |  6 +-
 .../service/FlinkResourceContextFactory.java       | 66 ++++++++++-----------
 .../operator/utils/EventSourceUtils.java           | 15 +++++
 4 files changed, 77 insertions(+), 77 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
index a2257447..e68d91ca 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java
@@ -18,7 +18,6 @@
 package org.apache.flink.kubernetes.operator.controller;
 
 import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
-import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
 import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
@@ -30,15 +29,12 @@ import 
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
 import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
-import 
io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
 import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
 import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import io.javaoperatorsdk.operator.processing.event.source.EventSource;
-import 
io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
-import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,26 +46,35 @@ import static 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDepl
 /**
  * Controller that runs the main reconcile loop for Flink Blue/Green 
deployments.
  *
- * <p>State Machine Flow
+ * <h2>State Machine Flow</h2>
  *
- * <p>Deployment States
+ * <p>Deployment states:
  *
- * <p>1. INITIALIZING_BLUE - First-time deployment setup 2. ACTIVE_BLUE - Blue 
environment serving
- * traffic, monitoring for updates 3. TRANSITIONING_TO_GREEN - Deploying Green 
environment while
- * Blue serves traffic 4. ACTIVE_GREEN - Green environment serving traffic, 
monitoring for updates
- * 5. TRANSITIONING_TO_BLUE - Deploying Blue environment while Green serves 
traffic
+ * <ol>
+ *   <li>{@code INITIALIZING_BLUE}: First-time deployment setup.
+ *   <li>{@code ACTIVE_BLUE}: Blue environment serving traffic, monitoring for 
updates.
+ *   <li>{@code TRANSITIONING_TO_GREEN}: Deploying Green environment while 
Blue serves traffic.
+ *   <li>{@code ACTIVE_GREEN}: Green environment serving traffic, monitoring 
for updates.
+ *   <li>{@code TRANSITIONING_TO_BLUE}: Deploying Blue environment while Green 
serves traffic.
+ * </ol>
  *
- * <p>Orchestration Process
+ * <h2>Orchestration Process</h2>
  *
- * <p>FlinkBlueGreenDeploymentController.reconcile() 1. Create 
BlueGreenContext with current
- * deployment state 2. Query StateHandlerRegistry for appropriate handler 3. 
Delegate to specific
- * StateHandler.handle(context) 4. StateHandler invokes 
BlueGreenDeploymentService operations 5.
- * Return UpdateControl with next reconciliation schedule
+ * <p>{@link #reconcile(FlinkBlueGreenDeployment, Context)} performs:
+ *
+ * <ol>
+ *   <li>Create a {@link BlueGreenContext} with the current deployment state.
+ *   <li>Query {@link BlueGreenStateHandlerRegistry} for the appropriate 
handler.
+ *   <li>Delegate to {@link BlueGreenStateHandler#handle(BlueGreenContext)}.
+ *   <li>The handler invokes {@link BlueGreenDeploymentService} operations.
+ *   <li>Return an {@link UpdateControl} with the next reconciliation schedule.
+ * </ol>
  */
 @ControllerConfiguration
 public class FlinkBlueGreenDeploymentController implements 
Reconciler<FlinkBlueGreenDeployment> {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDeploymentController.class);
+    private static final Logger LOG =
+            LoggerFactory.getLogger(FlinkBlueGreenDeploymentController.class);
 
     private final FlinkResourceContextFactory ctxFactory;
     private final BlueGreenStateHandlerRegistry handlerRegistry;
@@ -92,18 +97,7 @@ public class FlinkBlueGreenDeploymentController implements 
Reconciler<FlinkBlueG
     public List<EventSource<?, FlinkBlueGreenDeployment>> prepareEventSources(
             EventSourceContext<FlinkBlueGreenDeployment> context) {
         List<EventSource<?, FlinkBlueGreenDeployment>> eventSources = new 
ArrayList<>();
-
-        InformerEventSourceConfiguration<FlinkDeployment> config =
-                InformerEventSourceConfiguration.from(
-                                FlinkDeployment.class, 
FlinkBlueGreenDeployment.class)
-                        .withSecondaryToPrimaryMapper(
-                                
Mappers.fromOwnerReferences(context.getPrimaryResourceClass()))
-                        .withNamespacesInheritedFromController()
-                        .withFollowControllerNamespacesChanges(true)
-                        .build();
-
-        eventSources.add(new InformerEventSource<>(config, context));
-
+        
eventSources.add(EventSourceUtils.getBlueGreenFlinkDeploymentInformerEventSource(context));
         if (flinkConfigManager.getOperatorConfiguration().isManageIngress()) {
             
eventSources.add(EventSourceUtils.getBlueGreenIngressInformerEventSource(context));
         }
@@ -119,12 +113,8 @@ public class FlinkBlueGreenDeploymentController implements 
Reconciler<FlinkBlueG
 
         if (deploymentStatus == null) {
             var context =
-                    new BlueGreenContext(
-                            bgDeployment,
-                            new FlinkBlueGreenDeploymentStatus(),
-                            josdkContext,
-                            null,
-                            ctxFactory);
+                    ctxFactory.getBlueGreenContext(
+                            bgDeployment, new 
FlinkBlueGreenDeploymentStatus(), josdkContext, null);
             UpdateControl<FlinkBlueGreenDeployment> updateControl =
                     BlueGreenDeploymentService.patchStatusUpdateControl(
                                     context, INITIALIZING_BLUE, null, null)
@@ -134,15 +124,14 @@ public class FlinkBlueGreenDeploymentController 
implements Reconciler<FlinkBlueG
         } else {
             FlinkBlueGreenDeploymentState currentState = 
deploymentStatus.getBlueGreenState();
             var context =
-                    new BlueGreenContext(
+                    ctxFactory.getBlueGreenContext(
                             bgDeployment,
                             deploymentStatus,
                             josdkContext,
                             currentState == INITIALIZING_BLUE
                                     ? null
                                     : 
FlinkBlueGreenDeployments.fromSecondaryResources(
-                                            josdkContext),
-                            ctxFactory);
+                                            josdkContext));
 
             LOG.debug(
                     "Processing state: {} for deployment: {}",
@@ -155,8 +144,4 @@ public class FlinkBlueGreenDeploymentController implements 
Reconciler<FlinkBlueG
             return updateControl;
         }
     }
-
-    public static void logAndThrow(String message) {
-        throw new RuntimeException(message);
-    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeployments.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeployments.java
index 9f074df4..62899a98 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeployments.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeployments.java
@@ -51,7 +51,7 @@ public class FlinkBlueGreenDeployments {
                 context.getSecondaryResources(FlinkDeployment.class);
 
         if (secondaryResources.isEmpty() || secondaryResources.size() > 2) {
-            FlinkBlueGreenDeploymentController.logAndThrow(
+            throw new RuntimeException(
                     "Unexpected number of dependent deployments: " + 
secondaryResources.size());
         }
 
@@ -63,13 +63,13 @@ public class FlinkBlueGreenDeployments {
 
             if (flinkBlueGreenDeploymentType == BlueGreenDeploymentType.BLUE) {
                 if (flinkBlueGreenDeployments.getFlinkDeploymentBlue() != 
null) {
-                    FlinkBlueGreenDeploymentController.logAndThrow(
+                    throw new RuntimeException(
                             "Detected multiple Dependent Deployments of type 
BLUE");
                 }
                 
flinkBlueGreenDeployments.setFlinkDeploymentBlue(dependentDeployment);
             } else {
                 if (flinkBlueGreenDeployments.getFlinkDeploymentGreen() != 
null) {
-                    FlinkBlueGreenDeploymentController.logAndThrow(
+                    throw new RuntimeException(
                             "Detected multiple Dependent Deployments of type 
GREEN");
                 }
                 
flinkBlueGreenDeployments.setFlinkDeploymentGreen(dependentDeployment);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
index 9cc9b489..7f153b9d 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java
@@ -20,15 +20,19 @@ package org.apache.flink.kubernetes.operator.service;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
 import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import 
org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeployments;
 import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentContext;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobContext;
 import 
org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext;
+import 
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
 import 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
 import 
org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
 import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
@@ -98,39 +102,40 @@ public class FlinkResourceContextFactory {
                 configManager);
     }
 
+    public BlueGreenContext getBlueGreenContext(
+            FlinkBlueGreenDeployment bgDeployment,
+            FlinkBlueGreenDeploymentStatus deploymentStatus,
+            Context<FlinkBlueGreenDeployment> josdkContext,
+            FlinkBlueGreenDeployments deployments) {
+        return new BlueGreenContext(
+                bgDeployment, deploymentStatus, josdkContext, deployments, 
this);
+    }
+
+    @SuppressWarnings("unchecked")
     public <CR extends AbstractFlinkResource<?, ?>> FlinkResourceContext<CR> 
getResourceContext(
-            CR resource, Context josdkContext) {
+            CR resource, Context<CR> josdkContext) {
         var resMg =
                 resourceMetricGroups.computeIfAbsent(
                         Tuple2.of(resource.getClass(), 
ResourceID.fromResource(resource)),
                         r ->
                                 OperatorMetricUtils.createResourceMetricGroup(
                                         operatorMetricGroup, configManager, 
resource));
-        String jobId = null;
-        if (resource.getStatus() != null) {
-            if (resource.getStatus().getJobStatus() != null) {
-                jobId = resource.getStatus().getJobStatus().getJobId();
-            }
-        }
-        if (resource instanceof FlinkDeployment) {
-            var flinkDep = (FlinkDeployment) resource;
-            var resourceId = ResourceID.fromResource(flinkDep);
-            var flinkDepJobId = jobId;
+        if (resource instanceof FlinkDeployment flinkDeployment) {
+            var resourceId = ResourceID.fromResource(flinkDeployment);
             return (FlinkResourceContext<CR>)
                     new FlinkDeploymentContext(
-                            flinkDep,
+                            flinkDeployment,
                             josdkContext,
                             resMg,
                             configManager,
                             this::getFlinkService,
                             lastRecordedExceptionCache.computeIfAbsent(
                                     resourceId, id -> new 
ExceptionCacheEntry()));
-        } else if (resource instanceof FlinkSessionJob) {
+        } else if (resource instanceof FlinkSessionJob flinkSessionJob) {
             var resourceId = ResourceID.fromResource(resource);
-            var flinkSessionJobId = jobId;
             return (FlinkResourceContext<CR>)
                     new FlinkSessionJobContext(
-                            (FlinkSessionJob) resource,
+                            flinkSessionJob,
                             josdkContext,
                             resMg,
                             configManager,
@@ -146,24 +151,19 @@ public class FlinkResourceContextFactory {
     @VisibleForTesting
     protected FlinkService getFlinkService(FlinkResourceContext<?> ctx) {
         var deploymentMode = ctx.getDeploymentMode();
-        switch (deploymentMode) {
-            case NATIVE:
-                return new NativeFlinkService(
-                        ctx.getKubernetesClient(),
-                        artifactManager,
-                        clientExecutorService,
-                        ctx.getOperatorConfig(),
-                        eventRecorder);
-            case STANDALONE:
-                return new StandaloneFlinkService(
-                        ctx.getKubernetesClient(),
-                        artifactManager,
-                        clientExecutorService,
-                        ctx.getOperatorConfig());
-            default:
-                throw new UnsupportedOperationException(
-                        String.format("Unsupported deployment mode: %s", 
deploymentMode));
-        }
+        return switch (deploymentMode) {
+            case NATIVE -> new NativeFlinkService(
+                    ctx.getKubernetesClient(),
+                    artifactManager,
+                    clientExecutorService,
+                    ctx.getOperatorConfig(),
+                    eventRecorder);
+            case STANDALONE -> new StandaloneFlinkService(
+                    ctx.getKubernetesClient(),
+                    artifactManager,
+                    clientExecutorService,
+                    ctx.getOperatorConfig());
+        };
     }
 
     public <CR extends AbstractFlinkResource<?, ?>> void cleanup(CR flinkApp) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
index e3f22d47..f8665118 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
@@ -39,6 +39,7 @@ import 
io.javaoperatorsdk.operator.processing.event.source.EventSource;
 import 
io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
 import 
io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper;
 import 
io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
+import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
 
 import java.util.Collections;
 import java.util.List;
@@ -130,6 +131,20 @@ public class EventSourceUtils {
         return new InformerEventSource<>(configuration, context);
     }
 
+    public static InformerEventSource<FlinkDeployment, 
FlinkBlueGreenDeployment>
+            getBlueGreenFlinkDeploymentInformerEventSource(
+                    EventSourceContext<FlinkBlueGreenDeployment> context) {
+        var configuration =
+                InformerEventSourceConfiguration.from(
+                                FlinkDeployment.class, 
FlinkBlueGreenDeployment.class)
+                        .withSecondaryToPrimaryMapper(
+                                
Mappers.fromOwnerReferences(context.getPrimaryResourceClass()))
+                        .withNamespacesInheritedFromController()
+                        .withFollowControllerNamespacesChanges(true)
+                        .build();
+        return new InformerEventSource<>(configuration, context);
+    }
+
     public static InformerEventSource<?, FlinkBlueGreenDeployment>
             getBlueGreenIngressInformerEventSource(
                     EventSourceContext<FlinkBlueGreenDeployment> context) {

Reply via email to