This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 96f07a72 [FLINK-33081] Apply parallelism overrides during scale
operation
96f07a72 is described below
commit 96f07a7296696f41dadfcaf827c95ddf22f83542
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Sep 14 14:32:23 2023 +0200
[FLINK-33081] Apply parallelism overrides during scale operation
---
.../operator/autoscaler/JobAutoScalerImpl.java | 203 +++++++++++----------
.../autoscaler/BacklogBasedScalingTest.java | 2 +-
.../operator/autoscaler/JobAutoScalerImplTest.java | 47 ++++-
.../AbstractFlinkResourceReconciler.java | 10 +-
.../reconciler/deployment/JobAutoScaler.java | 5 +-
.../deployment/NoopJobAutoscalerFactory.java | 7 +-
.../deployment/ApplicationReconcilerTest.java | 2 +-
7 files changed, 153 insertions(+), 123 deletions(-)
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
index 356f0b51..b3013a0a 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
@@ -19,10 +19,12 @@ package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
+import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
import
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
@@ -74,6 +76,22 @@ public class JobAutoScalerImpl implements JobAutoScaler {
this.infoManager = new AutoscalerInfoManager();
}
+ @Override
+ public void scale(FlinkResourceContext<?> ctx) {
+ var conf = ctx.getObserveConfig();
+ var resource = ctx.getResource();
+ var resourceId = ResourceID.fromResource(resource);
+ var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx,
resourceId);
+
+ try {
+ runScalingLogic(ctx, conf, resource, resourceId,
autoscalerMetrics);
+ } catch (Throwable e) {
+ onError(ctx, resource, autoscalerMetrics, e);
+ } finally {
+ applyParallelismOverrides(ctx);
+ }
+ }
+
@Override
public void cleanup(FlinkResourceContext<?> ctx) {
LOG.info("Cleaning up autoscaling meta data");
@@ -85,25 +103,21 @@ public class JobAutoScalerImpl implements JobAutoScaler {
infoManager.removeInfoFromCache(cr);
}
+ private void clearParallelismOverrides(FlinkResourceContext<?> ctx) throws
Exception {
+ var infoOpt = infoManager.getInfo(ctx.getResource(),
ctx.getKubernetesClient());
+ if (infoOpt.isPresent()) {
+ var info = infoOpt.get();
+ info.removeCurrentOverrides();
+ info.replaceInKubernetes(ctx.getKubernetesClient());
+ }
+ }
+
@VisibleForTesting
protected Map<String, String>
getParallelismOverrides(FlinkResourceContext<?> ctx) {
- var conf = ctx.getObserveConfig();
- try {
- var infoOpt = infoManager.getInfo(ctx.getResource(),
ctx.getKubernetesClient());
- if (infoOpt.isPresent()) {
- var info = infoOpt.get();
- // If autoscaler was disabled need to delete the overrides
- if (!conf.getBoolean(AUTOSCALER_ENABLED) &&
!info.getCurrentOverrides().isEmpty()) {
- info.removeCurrentOverrides();
- info.replaceInKubernetes(ctx.getKubernetesClient());
- } else {
- return info.getCurrentOverrides();
- }
- }
- } catch (Exception e) {
- LOG.error("Error while getting parallelism overrides", e);
- }
- return Map.of();
+ return infoManager
+ .getInfo(ctx.getResource(), ctx.getKubernetesClient())
+ .map(AutoScalerInfo::getCurrentOverrides)
+ .orElse(Map.of());
}
/**
@@ -111,8 +125,8 @@ public class JobAutoScalerImpl implements JobAutoScaler {
*
* @param ctx Resource context
*/
- @Override
- public void applyParallelismOverrides(FlinkResourceContext<?> ctx) {
+ @VisibleForTesting
+ protected void applyParallelismOverrides(FlinkResourceContext<?> ctx) {
var overrides = getParallelismOverrides(ctx);
if (overrides.isEmpty()) {
return;
@@ -140,86 +154,87 @@ public class JobAutoScalerImpl implements JobAutoScaler {
ConfigurationUtils.convertValue(userOverrides,
String.class));
}
- @Override
- public boolean scale(FlinkResourceContext<?> ctx) {
+ private void runScalingLogic(
+ FlinkResourceContext<?> ctx,
+ Configuration conf,
+ AbstractFlinkResource<?, ?> resource,
+ ResourceID resourceId,
+ AutoscalerFlinkMetrics autoscalerMetrics)
+ throws Exception {
+
+ if (resource.getSpec().getJob() == null ||
!conf.getBoolean(AUTOSCALER_ENABLED)) {
+ LOG.debug("Autoscaler is disabled");
+ clearParallelismOverrides(ctx);
+ return;
+ }
- var conf = ctx.getObserveConfig();
- var resource = ctx.getResource();
- var resourceId = ResourceID.fromResource(resource);
- var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx,
resourceId);
+ var status = resource.getStatus();
+ if (status.getLifecycleState() != ResourceLifecycleState.STABLE
+ ||
!status.getJobStatus().getState().equals(JobStatus.RUNNING.name())) {
+ LOG.info("Autoscaler is waiting for RUNNING job state");
+ lastEvaluatedMetrics.remove(resourceId);
+ return;
+ }
- try {
- if (resource.getSpec().getJob() == null ||
!conf.getBoolean(AUTOSCALER_ENABLED)) {
- LOG.debug("Job autoscaler is disabled");
- return false;
- }
-
- // Initialize metrics only if autoscaler is enabled
-
- var status = resource.getStatus();
- if (status.getLifecycleState() != ResourceLifecycleState.STABLE
- ||
!status.getJobStatus().getState().equals(JobStatus.RUNNING.name())) {
- LOG.info("Job autoscaler is waiting for RUNNING job state");
- lastEvaluatedMetrics.remove(resourceId);
- return false;
- }
-
- var autoScalerInfo = infoManager.getOrCreateInfo(resource,
ctx.getKubernetesClient());
-
- var collectedMetrics =
- metricsCollector.updateMetrics(
- resource, autoScalerInfo, ctx.getFlinkService(),
conf);
-
- if (collectedMetrics.getMetricHistory().isEmpty()) {
- autoScalerInfo.replaceInKubernetes(ctx.getKubernetesClient());
- return false;
- }
- LOG.debug("Collected metrics: {}", collectedMetrics);
-
- var evaluatedMetrics = evaluator.evaluate(conf, collectedMetrics);
- LOG.debug("Evaluated metrics: {}", evaluatedMetrics);
- lastEvaluatedMetrics.put(resourceId, evaluatedMetrics);
-
- initRecommendedParallelism(evaluatedMetrics);
- autoscalerMetrics.registerScalingMetrics(
-
collectedMetrics.getJobTopology().getVerticesInTopologicalOrder(),
- () -> lastEvaluatedMetrics.get(resourceId));
-
- if (!collectedMetrics.isFullyCollected()) {
- // We have done an upfront evaluation, but we are not ready
for scaling.
- resetRecommendedParallelism(evaluatedMetrics);
- autoScalerInfo.replaceInKubernetes(ctx.getKubernetesClient());
- return false;
- }
-
- var specAdjusted =
- scalingExecutor.scaleResource(
- resource,
- autoScalerInfo,
- conf,
- evaluatedMetrics,
- ctx.getKubernetesClient());
-
- if (specAdjusted) {
- autoscalerMetrics.numScalings.inc();
- } else {
- autoscalerMetrics.numBalanced.inc();
- }
+ var autoScalerInfo = infoManager.getOrCreateInfo(resource,
ctx.getKubernetesClient());
+
+ var collectedMetrics =
+ metricsCollector.updateMetrics(
+ resource, autoScalerInfo, ctx.getFlinkService(), conf);
+ if (collectedMetrics.getMetricHistory().isEmpty()) {
autoScalerInfo.replaceInKubernetes(ctx.getKubernetesClient());
- return specAdjusted;
- } catch (Throwable e) {
- LOG.error("Error while scaling resource", e);
- autoscalerMetrics.numErrors.inc();
- eventRecorder.triggerEvent(
- resource,
- EventRecorder.Type.Warning,
- EventRecorder.Reason.AutoscalerError,
- EventRecorder.Component.Operator,
- e.getMessage(),
- ctx.getKubernetesClient());
- return false;
+ return;
}
+ LOG.debug("Collected metrics: {}", collectedMetrics);
+
+ var evaluatedMetrics = evaluator.evaluate(conf, collectedMetrics);
+ LOG.debug("Evaluated metrics: {}", evaluatedMetrics);
+ lastEvaluatedMetrics.put(resourceId, evaluatedMetrics);
+
+ initRecommendedParallelism(evaluatedMetrics);
+ autoscalerMetrics.registerScalingMetrics(
+
collectedMetrics.getJobTopology().getVerticesInTopologicalOrder(),
+ () -> lastEvaluatedMetrics.get(resourceId));
+
+ if (!collectedMetrics.isFullyCollected()) {
+ // We have done an upfront evaluation, but we are not ready for
scaling.
+ resetRecommendedParallelism(evaluatedMetrics);
+ autoScalerInfo.replaceInKubernetes(ctx.getKubernetesClient());
+ return;
+ }
+
+ var parallelismChanged =
+ scalingExecutor.scaleResource(
+ resource,
+ autoScalerInfo,
+ conf,
+ evaluatedMetrics,
+ ctx.getKubernetesClient());
+
+ if (parallelismChanged) {
+ autoscalerMetrics.numScalings.inc();
+ } else {
+ autoscalerMetrics.numBalanced.inc();
+ }
+
+ autoScalerInfo.replaceInKubernetes(ctx.getKubernetesClient());
+ }
+
+ private void onError(
+ FlinkResourceContext<?> ctx,
+ AbstractFlinkResource<?, ? extends CommonStatus<?>> resource,
+ AutoscalerFlinkMetrics autoscalerMetrics,
+ Throwable e) {
+ LOG.error("Error while scaling resource", e);
+ autoscalerMetrics.numErrors.inc();
+ eventRecorder.triggerEvent(
+ resource,
+ EventRecorder.Type.Warning,
+ EventRecorder.Reason.AutoscalerError,
+ EventRecorder.Component.Operator,
+ e.getMessage(),
+ ctx.getKubernetesClient());
}
private AutoscalerFlinkMetrics getOrInitAutoscalerFlinkMetrics(
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index 37b105bb..31727983 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -412,7 +412,7 @@ public class BacklogBasedScalingTest extends
OperatorTestBase {
var now = Instant.ofEpochMilli(0);
setClocksTo(now);
metricsCollector.setJobUpdateTs(now);
- assertFalse(autoscaler.scale(getResourceContext(app, ctx)));
+ autoscaler.scale(getResourceContext(app, ctx));
assertTrue(getOrCreateInfo(app,
kubernetesClient).getMetricHistory().isEmpty());
assertTrue(eventCollector.events.isEmpty());
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
index 2b27297e..9a0c8c44 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
@@ -87,7 +87,7 @@ public class JobAutoScalerImplTest extends OperatorTestBase {
}
@Test
- void testMetricReporting() {
+ void testMetricReporting() throws Exception {
JobVertexID jobVertexID = new JobVertexID();
JobTopology jobTopology = new JobTopology(new VertexInfo(jobVertexID,
Set.of(), 1, 10));
@@ -136,7 +136,7 @@ public class JobAutoScalerImplTest extends OperatorTestBase
{
}
@Test
- void testErrorReporting() {
+ void testErrorReporting() throws Exception {
var autoscaler = new JobAutoScalerImpl(null, null, null,
eventRecorder);
FlinkResourceContext<FlinkDeployment> resourceContext =
getResourceContext(app);
ResourceID resourceId = ResourceID.fromResource(app);
@@ -153,6 +153,8 @@ public class JobAutoScalerImplTest extends OperatorTestBase
{
@Test
void testParallelismOverrides() throws Exception {
var autoscaler = new JobAutoScalerImpl(null, null, null,
eventRecorder);
+ app.getStatus().getJobStatus().setState(JobStatus.CREATED.name());
+ var appClone = ReconciliationUtils.clone(app);
var ctx = getResourceContext(app);
// Initially we should return empty overrides, do not crate any CM
@@ -167,37 +169,64 @@ public class JobAutoScalerImplTest extends
OperatorTestBase {
autoscalerInfo.setCurrentOverrides(Map.of(v1, "1", v2, "2"));
autoscalerInfo.replaceInKubernetes(kubernetesClient);
- assertEquals(Map.of(v1, "1", v2, "2"),
autoscaler.getParallelismOverrides(ctx));
+ autoscaler.scale(ctx);
+ assertEquals(
+ Map.of(v1, "1", v2, "2"),
+
ctx.getDeployConfig(app.getSpec()).get(PipelineOptions.PARALLELISM_OVERRIDES));
// Disabling autoscaler should clear overrides
+ ctx = getResourceContext(app = ReconciliationUtils.clone(appClone));
app.getSpec().getFlinkConfiguration().put(AUTOSCALER_ENABLED.key(),
"false");
- ctx = getResourceContext(app);
+
+ autoscaler.scale(ctx);
assertEquals(Map.of(), autoscaler.getParallelismOverrides(ctx));
// But not clear the autoscaler info
assertTrue(autoscaler.infoManager.getInfoFromKubernetes(app,
kubernetesClient).isPresent());
+ assertEquals(
+ Map.of(),
+
ctx.getDeployConfig(app.getSpec()).get(PipelineOptions.PARALLELISM_OVERRIDES));
int requestCount = mockWebServer.getRequestCount();
// Make sure we don't update in kubernetes once removed
- autoscaler.getParallelismOverrides(ctx);
+ ctx = getResourceContext(app = ReconciliationUtils.clone(appClone));
+ autoscaler.scale(ctx);
assertEquals(requestCount, mockWebServer.getRequestCount());
+ ctx = getResourceContext(app = ReconciliationUtils.clone(appClone));
app.getSpec().getFlinkConfiguration().put(AUTOSCALER_ENABLED.key(),
"true");
- ctx = getResourceContext(app);
+ autoscaler.scale(ctx);
+
assertEquals(Map.of(), autoscaler.getParallelismOverrides(ctx));
+ assertEquals(
+ Map.of(),
+
ctx.getDeployConfig(app.getSpec()).get(PipelineOptions.PARALLELISM_OVERRIDES));
autoscalerInfo.setCurrentOverrides(Map.of(v1, "1", v2, "2"));
autoscalerInfo.replaceInKubernetes(kubernetesClient);
+ autoscaler.scale(ctx);
+
assertEquals(Map.of(v1, "1", v2, "2"),
autoscaler.getParallelismOverrides(ctx));
+ assertEquals(
+ Map.of(v1, "1", v2, "2"),
+
ctx.getDeployConfig(app.getSpec()).get(PipelineOptions.PARALLELISM_OVERRIDES));
+ ctx = getResourceContext(app = ReconciliationUtils.clone(appClone));
app.getSpec().getFlinkConfiguration().put(SCALING_ENABLED.key(),
"false");
- ctx = getResourceContext(app);
+
+ autoscaler.scale(ctx);
assertEquals(Map.of(v1, "1", v2, "2"),
autoscaler.getParallelismOverrides(ctx));
+ assertEquals(
+ Map.of(v1, "1", v2, "2"),
+
ctx.getDeployConfig(app.getSpec()).get(PipelineOptions.PARALLELISM_OVERRIDES));
// Test error handling
// Invalid config
+ ctx = getResourceContext(app = ReconciliationUtils.clone(appClone));
app.getSpec().getFlinkConfiguration().put(AUTOSCALER_ENABLED.key(),
"asd");
- ctx = getResourceContext(app);
- assertEquals(Map.of(), autoscaler.getParallelismOverrides(ctx));
+ autoscaler.scale(ctx);
+ assertEquals(
+ Map.of(v1, "1", v2, "2"),
+
ctx.getDeployConfig(app.getSpec()).get(PipelineOptions.PARALLELISM_OVERRIDES));
}
@Test
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index b82932f5..91edcf70 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -132,7 +132,7 @@ public abstract class AbstractFlinkResourceReconciler<
SPEC lastReconciledSpec =
cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
SPEC currentDeploySpec = cr.getSpec();
- resourceScaler.applyParallelismOverrides(ctx);
+ resourceScaler.scale(ctx);
var specDiff =
new ReflectiveDiffBuilder<>(
@@ -183,13 +183,7 @@ public abstract class AbstractFlinkResourceReconciler<
MSG_ROLLBACK,
ctx.getKubernetesClient());
} else if (!reconcileOtherChanges(ctx)) {
- if (resourceScaler.scale(ctx)) {
- LOG.info(
- "Rescheduling new reconciliation immediately to
execute scaling operation.");
- status.setImmediateReconciliationNeeded(true);
- } else {
- LOG.info("Resource fully reconciled, nothing to do...");
- }
+ LOG.info("Resource fully reconciled, nothing to do...");
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java
index 80e1e185..f3e2790a 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/JobAutoScaler.java
@@ -23,11 +23,8 @@ import
org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
public interface JobAutoScaler {
/** Called as part of the reconciliation loop. Returns true if this call
led to scaling. */
- boolean scale(FlinkResourceContext<?> ctx);
+ void scale(FlinkResourceContext<?> ctx) throws Exception;
/** Called when the custom resource is deleted. */
void cleanup(FlinkResourceContext<?> ctx);
-
- /** Apply current parallelism overrides. */
- void applyParallelismOverrides(FlinkResourceContext<?> ctx);
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java
index 8cbd3f91..7b022ec4 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/NoopJobAutoscalerFactory.java
@@ -29,13 +29,8 @@ public class NoopJobAutoscalerFactory implements
JobAutoScalerFactory, JobAutoSc
}
@Override
- public boolean scale(FlinkResourceContext<?> ctx) {
- return false;
- }
+ public void scale(FlinkResourceContext<?> ctx) {}
@Override
public void cleanup(FlinkResourceContext<?> ctx) {}
-
- @Override
- public void applyParallelismOverrides(FlinkResourceContext<?> ctx) {}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index a0bea65d..504bf671 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -888,7 +888,7 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
(r) ->
new NoopJobAutoscalerFactory() {
@Override
- public void
applyParallelismOverrides(FlinkResourceContext<?> ctx) {
+ public void scale(FlinkResourceContext<?> ctx) {
overrideFunction.get().accept(ctx.getResource().getSpec());
}
};