This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 96f07a72 [FLINK-33081] Apply parallelism overrides during scale 
operation
96f07a72 is described below

commit 96f07a7296696f41dadfcaf827c95ddf22f83542
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Sep 14 14:32:23 2023 +0200

    [FLINK-33081] Apply parallelism overrides during scale operation
---
 .../operator/autoscaler/JobAutoScalerImpl.java     | 203 +++++++++++----------
 .../autoscaler/BacklogBasedScalingTest.java        |   2 +-
 .../operator/autoscaler/JobAutoScalerImplTest.java |  47 ++++-
 .../AbstractFlinkResourceReconciler.java           |  10 +-
 .../reconciler/deployment/JobAutoScaler.java       |   5 +-
 .../deployment/NoopJobAutoscalerFactory.java       |   7 +-
 .../deployment/ApplicationReconcilerTest.java      |   2 +-
 7 files changed, 153 insertions(+), 123 deletions(-)

diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
index 356f0b51..b3013a0a 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
@@ -19,10 +19,12 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import 
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
+import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
 import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
 import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
@@ -74,6 +76,22 @@ public class JobAutoScalerImpl implements JobAutoScaler {
         this.infoManager = new AutoscalerInfoManager();
     }
 
+    @Override
+    public void scale(FlinkResourceContext<?> ctx) {
+        var conf = ctx.getObserveConfig();
+        var resource = ctx.getResource();
+        var resourceId = ResourceID.fromResource(resource);
+        var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx, 
resourceId);
+
+        try {
+            runScalingLogic(ctx, conf, resource, resourceId, 
autoscalerMetrics);
+        } catch (Throwable e) {
+            onError(ctx, resource, autoscalerMetrics, e);
+        } finally {
+            applyParallelismOverrides(ctx);
+        }
+    }
+
     @Override
     public void cleanup(FlinkResourceContext<?> ctx) {
         LOG.info("Cleaning up autoscaling meta data");
@@ -85,25 +103,21 @@ public class JobAutoScalerImpl implements JobAutoScaler {
         infoManager.removeInfoFromCache(cr);
     }
 
+    private void clearParallelismOverrides(FlinkResourceContext<?> ctx) throws 
Exception {
+        var infoOpt = infoManager.getInfo(ctx.getResource(), 
ctx.getKubernetesClient());
+        if (infoOpt.isPresent()) {
+            var info = infoOpt.get();
+            info.removeCurrentOverrides();
+            info.replaceInKubernetes(ctx.getKubernetesClient());
+        }
+    }
+
     @VisibleForTesting
     protected Map<String, String> 
getParallelismOverrides(FlinkResourceContext<?> ctx) {
-        var conf = ctx.getObserveConfig();
-        try {
-            var infoOpt = infoManager.getInfo(ctx.getResource(), 
ctx.getKubernetesClient());
-            if (infoOpt.isPresent()) {
-                var info = infoOpt.get();
-                // If autoscaler was disabled need to delete the overrides
-                if (!conf.getBoolean(AUTOSCALER_ENABLED) && 
!info.getCurrentOverrides().isEmpty()) {
-                    info.removeCurrentOverrides();
-                    info.replaceInKubernetes(ctx.getKubernetesClient());
-                } else {
-                    return info.getCurrentOverrides();
-                }
-            }
-        } catch (Exception e) {
-            LOG.error("Error while getting parallelism overrides", e);
-        }
-        return Map.of();
+        return infoManager
+                .getInfo(ctx.getResource(), ctx.getKubernetesClient())
+                .map(AutoScalerInfo::getCurrentOverrides)
+                .orElse(Map.of());
     }
 
     /**
@@ -111,8 +125,8 @@ public class JobAutoScalerImpl implements JobAutoScaler {
      *
      * @param ctx Resource context
      */
-    @Override
-    public void applyParallelismOverrides(FlinkResourceContext<?> ctx) {
+    @VisibleForTesting
+    protected void applyParallelismOverrides(FlinkResourceContext<?> ctx) {
         var overrides = getParallelismOverrides(ctx);
         if (overrides.isEmpty()) {
             return;
@@ -140,86 +154,87 @@ public class JobAutoScalerImpl implements JobAutoScaler {
                         ConfigurationUtils.convertValue(userOverrides, 
String.class));
     }
 
-    @Override
-    public boolean scale(FlinkResourceContext<?> ctx) {
+    private void runScalingLogic(
+            FlinkResourceContext<?> ctx,
+            Configuration conf,
+            AbstractFlinkResource<?, ?> resource,
+            ResourceID resourceId,
+            AutoscalerFlinkMetrics autoscalerMetrics)
+            throws Exception {
+
+        if (resource.getSpec().getJob() == null || 
!conf.getBoolean(AUTOSCALER_ENABLED)) {
+            LOG.debug("Autoscaler is disabled");
+            clearParallelismOverrides(ctx);
+            return;
+        }
 
-        var conf = ctx.getObserveConfig();
-        var resource = ctx.getResource();
-        var resourceId = ResourceID.fromResource(resource);
-        var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx, 
resourceId);
+        var status = resource.getStatus();
+        if (status.getLifecycleState() != ResourceLifecycleState.STABLE
+                || 
!status.getJobStatus().getState().equals(JobStatus.RUNNING.name())) {
+            LOG.info("Autoscaler is waiting for RUNNING job state");
+            lastEvaluatedMetrics.remove(resourceId);
+            return;
+        }
 
-        try {
-            if (resource.getSpec().getJob() == null || 
!conf.getBoolean(AUTOSCALER_ENABLED)) {
-                LOG.debug("Job autoscaler is disabled");
-                return false;
-            }
-
-            // Initialize metrics only if autoscaler is enabled
-
-            var status = resource.getStatus();
-            if (status.getLifecycleState() != ResourceLifecycleState.STABLE
-                    || 
!status.getJobStatus().getState().equals(JobStatus.RUNNING.name())) {
-                LOG.info("Job autoscaler is waiting for RUNNING job state");
-                lastEvaluatedMetrics.remove(resourceId);
-                return false;
-            }
-
-            var autoScalerInfo = infoManager.getOrCreateInfo(resource, 
ctx.getKubernetesClient());
-
-            var collectedMetrics =
-                    metricsCollector.updateMetrics(
-                            resource, autoScalerInfo, ctx.getFlinkService(), 
conf);
-
-            if (collectedMetrics.getMetricHistory().isEmpty()) {
-                autoScalerInfo.replaceInKubernetes(ctx.getKubernetesClient());
-                return false;
-            }
-            LOG.debug("Collected metrics: {}", collectedMetrics);
-
-            var evaluatedMetrics = evaluator.evaluate(conf, collectedMetrics);
-            LOG.debug("Evaluated metrics: {}", evaluatedMetrics);
-            lastEvaluatedMetrics.put(resourceId, evaluatedMetrics);
-
-            initRecommendedParallelism(evaluatedMetrics);
-            autoscalerMetrics.registerScalingMetrics(
-                    
collectedMetrics.getJobTopology().getVerticesInTopologicalOrder(),
-                    () -> lastEvaluatedMetrics.get(resourceId));
-
-            if (!collectedMetrics.isFullyCollected()) {
-                // We have done an upfront evaluation, but we are not ready 
for scaling.
-                resetRecommendedParallelism(evaluatedMetrics);
-                autoScalerInfo.replaceInKubernetes(ctx.getKubernetesClient());
-                return false;
-            }
-
-            var specAdjusted =
-                    scalingExecutor.scaleResource(
-                            resource,
-                            autoScalerInfo,
-                            conf,
-                            evaluatedMetrics,
-                            ctx.getKubernetesClient());
-
-            if (specAdjusted) {
-                autoscalerMetrics.numScalings.inc();
-            } else {
-                autoscalerMetrics.numBalanced.inc();
-            }
+        var autoScalerInfo = infoManager.getOrCreateInfo(resource, 
ctx.getKubernetesClient());
+
+        var collectedMetrics =
+                metricsCollector.updateMetrics(
+                        resource, autoScalerInfo, ctx.getFlinkService(), conf);
 
+        if (collectedMetrics.getMetricHistory().isEmpty()) {
             autoScalerInfo.replaceInKubernetes(ctx.getKubernetesClient());
-            return specAdjusted;
-        } catch (Throwable e) {
-            LOG.error("Error while scaling resource", e);
-            autoscalerMetrics.numErrors.inc();
-            eventRecorder.triggerEvent(
-                    resource,
-                    EventRecorder.Type.Warning,
-                    EventRecorder.Reason.AutoscalerError,
-                    EventRecorder.Component.Operator,
-                    e.getMessage(),
-                    ctx.getKubernetesClient());
-            return false;
+            return;
         }
+        LOG.debug("Collected metrics: {}", collectedMetrics);
+
+        var evaluatedMetrics = evaluator.evaluate(conf, collectedMetrics);
+        LOG.debug("Evaluated metrics: {}", evaluatedMetrics);
+        lastEvaluatedMetrics.put(resourceId, evaluatedMetrics);
+
+        initRecommendedParallelism(evaluatedMetrics);
+        autoscalerMetrics.registerScalingMetrics(
+                
collectedMetrics.getJobTopology().getVerticesInTopologicalOrder(),
+                () -> lastEvaluatedMetrics.get(resourceId));
+
+        if (!collectedMetrics.isFullyCollected()) {
+            // We have done an upfront evaluation, but we are not ready for 
scaling.
+            resetRecommendedParallelism(evaluatedMetrics);
+            autoScalerInfo.replaceInKubernetes(ctx.getKubernetesClient());
+            return;
+        }
+
+        var parallelismChanged =
+                scalingExecutor.scaleResource(
+                        resource,
+                        autoScalerInfo,
+                        conf,
+                        evaluatedMetrics,
+                        ctx.getKubernetesClient());
+
+        if (parallelismChanged) {
+            autoscalerMetrics.numScalings.inc();
+        } else {
+            autoscalerMetrics.numBalanced.inc();
+        }
+
+        autoScalerInfo.replaceInKubernetes(ctx.getKubernetesClient());
+    }
+
+    private void onError(
+            FlinkResourceContext<?> ctx,
+            AbstractFlinkResource<?, ? extends CommonStatus<?>> resource,
+            AutoscalerFlinkMetrics autoscalerMetrics,
+            Throwable e) {
+        LOG.error("Error while scaling resource", e);
+        autoscalerMetrics.numErrors.inc();
+        eventRecorder.triggerEvent(
+                resource,
+                EventRecorder.Type.Warning,
+                EventRecorder.Reason.AutoscalerError,
+                EventRecorder.Component.Operator,
+                e.getMessage(),
+                ctx.getKubernetesClient());
     }
 
     private AutoscalerFlinkMetrics getOrInitAutoscalerFlinkMetrics(
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index 37b105bb..31727983 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -412,7 +412,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
         var now = Instant.ofEpochMilli(0);
         setClocksTo(now);
         metricsCollector.setJobUpdateTs(now);
-        assertFalse(autoscaler.scale(getResourceContext(app, ctx)));
+        autoscaler.scale(getResourceContext(app, ctx));
         assertTrue(getOrCreateInfo(app, 
kubernetesClient).getMetricHistory().isEmpty());
         assertTrue(eventCollector.events.isEmpty());
     }
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
index 2b27297e..9a0c8c44 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
@@ -87,7 +87,7 @@ public class JobAutoScalerImplTest extends OperatorTestBase {
     }
 
     @Test
-    void testMetricReporting() {
+    void testMetricReporting() throws Exception {
         JobVertexID jobVertexID = new JobVertexID();
         JobTopology jobTopology = new JobTopology(new VertexInfo(jobVertexID, 
Set.of(), 1, 10));
 
@@ -136,7 +136,7 @@ public class JobAutoScalerImplTest extends OperatorTestBase 
{
     }
 
     @Test
-    void testErrorReporting() {
+    void testErrorReporting() throws Exception {
         var autoscaler = new JobAutoScalerImpl(null, null, null, 
eventRecorder);
         FlinkResourceContext<FlinkDeployment> resourceContext = 
getResourceContext(app);
         ResourceID resourceId = ResourceID.fromResource(app);
@@ -153,6 +153,8 @@ public class JobAutoScalerImplTest extends OperatorTestBase 
{
     @Test
     void testParallelismOverrides() throws Exception {
         var autoscaler = new JobAutoScalerImpl(null, null, null, 
eventRecorder);
+        app.getStatus().getJobStatus().setState(JobStatus.CREATED.name());
+        var appClone = ReconciliationUtils.clone(app);
         var ctx = getResourceContext(app);
 
         // Initially we should return empty overrides, do not crate any CM
@@ -167,37 +169,64 @@ public class JobAutoScalerImplTest extends 
OperatorTestBase {
         autoscalerInfo.setCurrentOverrides(Map.of(v1, "1", v2, "2"));
         autoscalerInfo.replaceInKubernetes(kubernetesClient);
 
-        assertEquals(Map.of(v1, "1", v2, "2"), 
autoscaler.getParallelismOverrides(ctx));
+        autoscaler.scale(ctx);
+        assertEquals(
+                Map.of(v1, "1", v2, "2"),
+                
ctx.getDeployConfig(app.getSpec()).get(PipelineOptions.PARALLELISM_OVERRIDES));
 
         // Disabling autoscaler should clear overrides
+        ctx = getResourceContext(app = ReconciliationUtils.clone(appClone));
         app.getSpec().getFlinkConfiguration().put(AUTOSCALER_ENABLED.key(), 
"false");
-        ctx = getResourceContext(app);
+
+        autoscaler.scale(ctx);
         assertEquals(Map.of(), autoscaler.getParallelismOverrides(ctx));
         // But not clear the autoscaler info
         assertTrue(autoscaler.infoManager.getInfoFromKubernetes(app, 
kubernetesClient).isPresent());
+        assertEquals(
+                Map.of(),
+                
ctx.getDeployConfig(app.getSpec()).get(PipelineOptions.PARALLELISM_OVERRIDES));
 
         int requestCount = mockWebServer.getRequestCount();
         // Make sure we don't update in kubernetes once removed
-        autoscaler.getParallelismOverrides(ctx);
+        ctx = getResourceContext(app = ReconciliationUtils.clone(appClone));
+        autoscaler.scale(ctx);
         assertEquals(requestCount, mockWebServer.getRequestCount());
 
+        ctx = getResourceContext(app = ReconciliationUtils.clone(appClone));
         app.getSpec().getFlinkConfiguration().put(AUTOSCALER_ENABLED.key(), 
"true");
-        ctx = getResourceContext(app);
+        autoscaler.scale(ctx);
+
         assertEquals(Map.of(), autoscaler.getParallelismOverrides(ctx));
+        assertEquals(
+                Map.of(),
+                
ctx.getDeployConfig(app.getSpec()).get(PipelineOptions.PARALLELISM_OVERRIDES));
 
         autoscalerInfo.setCurrentOverrides(Map.of(v1, "1", v2, "2"));
         autoscalerInfo.replaceInKubernetes(kubernetesClient);
+        autoscaler.scale(ctx);
+
         assertEquals(Map.of(v1, "1", v2, "2"), 
autoscaler.getParallelismOverrides(ctx));
+        assertEquals(
+                Map.of(v1, "1", v2, "2"),
+                
ctx.getDeployConfig(app.getSpec()).get(PipelineOptions.PARALLELISM_OVERRIDES));
 
+        ctx = getResourceContext(app = ReconciliationUtils.clone(appClone));
         app.getSpec().getFlinkConfiguration().put(SCALING_ENABLED.key(), 
"false");
-        ctx = getResourceContext(app);
+
+        autoscaler.scale(ctx);
         assertEquals(Map.of(v1, "1", v2, "2"), 
autoscaler.getParallelismOverrides(ctx));
+        assertEquals(
+                Map.of(v1, "1", v2, "2"),
+                
ctx.getDeployConfig(app.getSpec()).get(PipelineOptions.PARALLELISM_OVERRIDES));
 
         // Test error handling
         // Invalid config
+        ctx = getResourceContext(app = ReconciliationUtils.clone(appClone));
         app.getSpec().getFlinkConfiguration().put(AUTOSCALER_ENABLED.key(), 
"asd");
-        ctx = getResourceContext(app);
-        assertEquals(Map.of(), autoscaler.getParallelismOverrides(ctx));
+        autoscaler.scale(ctx);
+        assertEquals(
+                Map.of(v1, "1", v2, "2"),
+                
ctx.getDeployConfig(app.getSpec()).get(PipelineOptions.PARALLELISM_OVERRIDES));
     }
 
     @Test
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index b82932f5..91edcf70 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -132,7 +132,7 @@ public abstract class AbstractFlinkResourceReconciler<
         SPEC lastReconciledSpec =
                 
cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
         SPEC currentDeploySpec = cr.getSpec();
-        resourceScaler.applyParallelismOverrides(ctx);
+        resourceScaler.scale(ctx);
 
         var specDiff =
                 new ReflectiveDiffBuilder<>(
@@ -183,13 +183,7 @@ public abstract class AbstractFlinkResourceReconciler<
                     MSG_ROLLBACK,
                     ctx.getKubernetesClient());
         } else if (!reconcileOtherChanges(ctx)) {
-            if (resourceScaler.scale(ctx)) {
-                LOG.info(
-                        "Rescheduling new reconciliation immediately to 
execute scaling operation.");
-                status.setImmediateReconciliationNeeded(true);
-            } else {
-                LOG.info("Resource fully reconciled, nothing to do...");
-            }
+            LOG.info("Resource fully reconciled, nothing to do...");
         }
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java
index 80e1e185..f3e2790a 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java
@@ -23,11 +23,8 @@ import 
org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 public interface JobAutoScaler {
 
     /** Called as part of the reconciliation loop. Returns true if this call 
led to scaling. */
-    boolean scale(FlinkResourceContext<?> ctx);
+    void scale(FlinkResourceContext<?> ctx) throws Exception;
 
     /** Called when the custom resource is deleted. */
     void cleanup(FlinkResourceContext<?> ctx);
-
-    /** Apply current parallelism overrides. */
-    void applyParallelismOverrides(FlinkResourceContext<?> ctx);
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java
index 8cbd3f91..7b022ec4 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java
@@ -29,13 +29,8 @@ public class NoopJobAutoscalerFactory implements 
JobAutoScalerFactory, JobAutoSc
     }
 
     @Override
-    public boolean scale(FlinkResourceContext<?> ctx) {
-        return false;
-    }
+    public void scale(FlinkResourceContext<?> ctx) {}
 
     @Override
     public void cleanup(FlinkResourceContext<?> ctx) {}
-
-    @Override
-    public void applyParallelismOverrides(FlinkResourceContext<?> ctx) {}
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index a0bea65d..504bf671 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -888,7 +888,7 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
                 (r) ->
                         new NoopJobAutoscalerFactory() {
                             @Override
-                            public void 
applyParallelismOverrides(FlinkResourceContext<?> ctx) {
+                            public void scale(FlinkResourceContext<?> ctx) {
                                 
overrideFunction.get().accept(ctx.getResource().getSpec());
                             }
                         };

Reply via email to