This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 8fb828803d0a30acc8e13f3c517c0bf8aecda337 Author: Dennis-Mircea Ciupitu <[email protected]> AuthorDate: Mon May 25 16:42:54 2026 +0300 [hotfix] Clean up FlinkBlueGreenDeploymentController --- .../FlinkBlueGreenDeploymentController.java | 67 +++++++++------------- .../controller/FlinkBlueGreenDeployments.java | 6 +- .../service/FlinkResourceContextFactory.java | 66 ++++++++++----------- .../operator/utils/EventSourceUtils.java | 15 +++++ 4 files changed, 77 insertions(+), 77 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java index a2257447..e68d91ca 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java @@ -18,7 +18,6 @@ package org.apache.flink.kubernetes.operator.controller; import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment; -import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState; import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; @@ -30,15 +29,12 @@ import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory; import org.apache.flink.kubernetes.operator.utils.EventSourceUtils; import org.apache.flink.kubernetes.operator.utils.StatusRecorder; -import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.event.source.EventSource; -import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; -import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,26 +46,35 @@ import static org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDepl /** * Controller that runs the main reconcile loop for Flink Blue/Green deployments. * - * <p>State Machine Flow + * <h2>State Machine Flow</h2> * - * <p>Deployment States + * <p>Deployment states: * - * <p>1. INITIALIZING_BLUE - First-time deployment setup 2. ACTIVE_BLUE - Blue environment serving - * traffic, monitoring for updates 3. TRANSITIONING_TO_GREEN - Deploying Green environment while - * Blue serves traffic 4. ACTIVE_GREEN - Green environment serving traffic, monitoring for updates - * 5. TRANSITIONING_TO_BLUE - Deploying Blue environment while Green serves traffic + * <ol> + * <li>{@code INITIALIZING_BLUE}: First-time deployment setup. + * <li>{@code ACTIVE_BLUE}: Blue environment serving traffic, monitoring for updates. + * <li>{@code TRANSITIONING_TO_GREEN}: Deploying Green environment while Blue serves traffic. + * <li>{@code ACTIVE_GREEN}: Green environment serving traffic, monitoring for updates. + * <li>{@code TRANSITIONING_TO_BLUE}: Deploying Blue environment while Green serves traffic. + * </ol> * - * <p>Orchestration Process + * <h2>Orchestration Process</h2> * - * <p>FlinkBlueGreenDeploymentController.reconcile() 1. Create BlueGreenContext with current - * deployment state 2. Query StateHandlerRegistry for appropriate handler 3. Delegate to specific - * StateHandler.handle(context) 4. StateHandler invokes BlueGreenDeploymentService operations 5. - * Return UpdateControl with next reconciliation schedule + * <p>{@link #reconcile(FlinkBlueGreenDeployment, Context)} performs: + * + * <ol> + * <li>Create a {@link BlueGreenContext} with the current deployment state. + * <li>Query {@link BlueGreenStateHandlerRegistry} for the appropriate handler. + * <li>Delegate to {@link BlueGreenStateHandler#handle(BlueGreenContext)}. + * <li>The handler invokes {@link BlueGreenDeploymentService} operations. + * <li>Return an {@link UpdateControl} with the next reconciliation schedule. + * </ol> */ @ControllerConfiguration public class FlinkBlueGreenDeploymentController implements Reconciler<FlinkBlueGreenDeployment> { - private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentController.class); + private static final Logger LOG = + LoggerFactory.getLogger(FlinkBlueGreenDeploymentController.class); private final FlinkResourceContextFactory ctxFactory; private final BlueGreenStateHandlerRegistry handlerRegistry; @@ -92,18 +97,7 @@ public class FlinkBlueGreenDeploymentController implements Reconciler<FlinkBlueG public List<EventSource<?, FlinkBlueGreenDeployment>> prepareEventSources( EventSourceContext<FlinkBlueGreenDeployment> context) { List<EventSource<?, FlinkBlueGreenDeployment>> eventSources = new ArrayList<>(); - - InformerEventSourceConfiguration<FlinkDeployment> config = - InformerEventSourceConfiguration.from( - FlinkDeployment.class, FlinkBlueGreenDeployment.class) - .withSecondaryToPrimaryMapper( - Mappers.fromOwnerReferences(context.getPrimaryResourceClass())) - .withNamespacesInheritedFromController() - .withFollowControllerNamespacesChanges(true) - .build(); - - eventSources.add(new InformerEventSource<>(config, context)); - + eventSources.add(EventSourceUtils.getBlueGreenFlinkDeploymentInformerEventSource(context)); if (flinkConfigManager.getOperatorConfiguration().isManageIngress()) { eventSources.add(EventSourceUtils.getBlueGreenIngressInformerEventSource(context)); } @@ -119,12 +113,8 @@ public class FlinkBlueGreenDeploymentController implements Reconciler<FlinkBlueG if (deploymentStatus == null) { var context = - new BlueGreenContext( - bgDeployment, - new FlinkBlueGreenDeploymentStatus(), - josdkContext, - null, - ctxFactory); + ctxFactory.getBlueGreenContext( + bgDeployment, new FlinkBlueGreenDeploymentStatus(), josdkContext, null); UpdateControl<FlinkBlueGreenDeployment> updateControl = BlueGreenDeploymentService.patchStatusUpdateControl( context, INITIALIZING_BLUE, null, null) @@ -134,15 +124,14 @@ public class FlinkBlueGreenDeploymentController implements Reconciler<FlinkBlueG } else { FlinkBlueGreenDeploymentState currentState = deploymentStatus.getBlueGreenState(); var context = - new BlueGreenContext( + ctxFactory.getBlueGreenContext( bgDeployment, deploymentStatus, josdkContext, currentState == INITIALIZING_BLUE ? null : FlinkBlueGreenDeployments.fromSecondaryResources( - josdkContext), - ctxFactory); + josdkContext)); LOG.debug( "Processing state: {} for deployment: {}", @@ -155,8 +144,4 @@ public class FlinkBlueGreenDeploymentController implements Reconciler<FlinkBlueG return updateControl; } } - - public static void logAndThrow(String message) { - throw new RuntimeException(message); - } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeployments.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeployments.java index 9f074df4..62899a98 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeployments.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeployments.java @@ -51,7 +51,7 @@ public class FlinkBlueGreenDeployments { context.getSecondaryResources(FlinkDeployment.class); if (secondaryResources.isEmpty() || secondaryResources.size() > 2) { - FlinkBlueGreenDeploymentController.logAndThrow( + throw new RuntimeException( "Unexpected number of dependent deployments: " + secondaryResources.size()); } @@ -63,13 +63,13 @@ public class FlinkBlueGreenDeployments { if (flinkBlueGreenDeploymentType == BlueGreenDeploymentType.BLUE) { if (flinkBlueGreenDeployments.getFlinkDeploymentBlue() != null) { - FlinkBlueGreenDeploymentController.logAndThrow( + throw new RuntimeException( "Detected multiple Dependent Deployments of type BLUE"); } flinkBlueGreenDeployments.setFlinkDeploymentBlue(dependentDeployment); } else { if (flinkBlueGreenDeployments.getFlinkDeploymentGreen() != null) { - FlinkBlueGreenDeploymentController.logAndThrow( + throw new RuntimeException( "Detected multiple Dependent Deployments of type GREEN"); } flinkBlueGreenDeployments.setFlinkDeploymentGreen(dependentDeployment); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java index 9cc9b489..7f153b9d 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java @@ -20,15 +20,19 @@ package org.apache.flink.kubernetes.operator.service; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; +import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus; import org.apache.flink.kubernetes.operator.artifact.ArtifactManager; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeployments; import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentContext; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobContext; import org.apache.flink.kubernetes.operator.controller.FlinkStateSnapshotContext; +import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext; import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup; import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup; import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils; @@ -98,39 +102,40 @@ public class FlinkResourceContextFactory { configManager); } + public BlueGreenContext getBlueGreenContext( + FlinkBlueGreenDeployment bgDeployment, + FlinkBlueGreenDeploymentStatus deploymentStatus, + Context<FlinkBlueGreenDeployment> josdkContext, + FlinkBlueGreenDeployments deployments) { + return new BlueGreenContext( + bgDeployment, deploymentStatus, josdkContext, deployments, this); + } + + @SuppressWarnings("unchecked") public <CR extends AbstractFlinkResource<?, ?>> FlinkResourceContext<CR> getResourceContext( - CR resource, Context josdkContext) { + CR resource, Context<CR> josdkContext) { var resMg = resourceMetricGroups.computeIfAbsent( Tuple2.of(resource.getClass(), ResourceID.fromResource(resource)), r -> OperatorMetricUtils.createResourceMetricGroup( operatorMetricGroup, configManager, resource)); - String jobId = null; - if (resource.getStatus() != null) { - if (resource.getStatus().getJobStatus() != null) { - jobId = resource.getStatus().getJobStatus().getJobId(); - } - } - if (resource instanceof FlinkDeployment) { - var flinkDep = (FlinkDeployment) resource; - var resourceId = ResourceID.fromResource(flinkDep); - var flinkDepJobId = jobId; + if (resource instanceof FlinkDeployment flinkDeployment) { + var resourceId = ResourceID.fromResource(flinkDeployment); return (FlinkResourceContext<CR>) new FlinkDeploymentContext( - flinkDep, + flinkDeployment, josdkContext, resMg, configManager, this::getFlinkService, lastRecordedExceptionCache.computeIfAbsent( resourceId, id -> new ExceptionCacheEntry())); - } else if (resource instanceof FlinkSessionJob) { + } else if (resource instanceof FlinkSessionJob flinkSessionJob) { var resourceId = ResourceID.fromResource(resource); - var flinkSessionJobId = jobId; return (FlinkResourceContext<CR>) new FlinkSessionJobContext( - (FlinkSessionJob) resource, + flinkSessionJob, josdkContext, resMg, configManager, @@ -146,24 +151,19 @@ public class FlinkResourceContextFactory { @VisibleForTesting protected FlinkService getFlinkService(FlinkResourceContext<?> ctx) { var deploymentMode = ctx.getDeploymentMode(); - switch (deploymentMode) { - case NATIVE: - return new NativeFlinkService( - ctx.getKubernetesClient(), - artifactManager, - clientExecutorService, - ctx.getOperatorConfig(), - eventRecorder); - case STANDALONE: - return new StandaloneFlinkService( - ctx.getKubernetesClient(), - artifactManager, - clientExecutorService, - ctx.getOperatorConfig()); - default: - throw new UnsupportedOperationException( - String.format("Unsupported deployment mode: %s", deploymentMode)); - } + return switch (deploymentMode) { + case NATIVE -> new NativeFlinkService( + ctx.getKubernetesClient(), + artifactManager, + clientExecutorService, + ctx.getOperatorConfig(), + eventRecorder); + case STANDALONE -> new StandaloneFlinkService( + ctx.getKubernetesClient(), + artifactManager, + clientExecutorService, + ctx.getOperatorConfig()); + }; } public <CR extends AbstractFlinkResource<?, ?>> void cleanup(CR flinkApp) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java index e3f22d47..f8665118 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java @@ -39,6 +39,7 @@ import io.javaoperatorsdk.operator.processing.event.source.EventSource; import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper; import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; +import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; import java.util.Collections; import java.util.List; @@ -130,6 +131,20 @@ public class EventSourceUtils { return new InformerEventSource<>(configuration, context); } + public static InformerEventSource<FlinkDeployment, FlinkBlueGreenDeployment> + getBlueGreenFlinkDeploymentInformerEventSource( + EventSourceContext<FlinkBlueGreenDeployment> context) { + var configuration = + InformerEventSourceConfiguration.from( + FlinkDeployment.class, FlinkBlueGreenDeployment.class) + .withSecondaryToPrimaryMapper( + Mappers.fromOwnerReferences(context.getPrimaryResourceClass())) + .withNamespacesInheritedFromController() + .withFollowControllerNamespacesChanges(true) + .build(); + return new InformerEventSource<>(configuration, context); + } + public static InformerEventSource<?, FlinkBlueGreenDeployment> getBlueGreenIngressInformerEventSource( EventSourceContext<FlinkBlueGreenDeployment> context) {
