This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 58b8ea8c [FLINK-33993] Fix misleading events for scaling effectiveness
detection (#748)
58b8ea8c is described below
commit 58b8ea8ca3ce12d6f7c8d0b893a5e2d29c506cfd
Author: Maximilian Michels <[email protected]>
AuthorDate: Fri Jan 5 10:58:53 2024 +0100
[FLINK-33993] Fix misleading events for scaling effectiveness detection
(#748)
When the ineffective scaling decision feature is turned off, events are
regenerated which look like this:
```
Skipping further scale up after ineffective previous scale up for
65c763af14a952c064c400d516c25529
```
This is misleading because no action will be taken. It is fair to inform
users
about ineffective scale up even when the feature is disabled but a different
message should be printed to convey that no action will be taken.
---
.../apache/flink/autoscaler/JobVertexScaler.java | 15 +++++++++++---
.../flink/autoscaler/JobVertexScalerTest.java | 23 ++++++++++++++++++++--
2 files changed, 33 insertions(+), 5 deletions(-)
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
index 915cd2c4..01f6d940 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
@@ -58,7 +58,7 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
@VisibleForTesting
protected static final String INEFFECTIVE_MESSAGE_FORMAT =
- "Skipping further scale up after ineffective previous scale up for
%s";
+ "Ineffective scaling detected for %s (expected increase: %s,
actual increase %s). Blocking of ineffective scaling decisions is %s";
private Clock clock = Clock.system(ZoneId.systemDefault());
@@ -214,7 +214,16 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
return false;
}
- var message = String.format(INEFFECTIVE_MESSAGE_FORMAT, vertex);
+ boolean blockIneffectiveScalings =
+
conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED);
+
+ var message =
+ String.format(
+ INEFFECTIVE_MESSAGE_FORMAT,
+ vertex,
+ expectedIncrease,
+ actualIncrease,
+ blockIneffectiveScalings ? "enabled" : "disabled");
autoScalerEventHandler.handleEvent(
context,
@@ -224,7 +233,7 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
null,
conf.get(SCALING_EVENT_INTERVAL));
- if
(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
+ if (blockIneffectiveScalings) {
LOG.warn(
"Ineffective scaling detected for {}, expected increase
{}, actual {}",
vertex,
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index 0f9242e7..f87fbebe 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -431,7 +431,9 @@ public class JobVertexScalerTest {
var event = eventCollector.events.poll();
assertThat(event).isNotNull();
assertThat(event.getMessage())
- .isEqualTo(String.format(INEFFECTIVE_MESSAGE_FORMAT,
jobVertexID));
+ .isEqualTo(
+ String.format(
+ INEFFECTIVE_MESSAGE_FORMAT, jobVertexID, 90.0,
5.0, "enabled"));
assertThat(event.getReason()).isEqualTo(INEFFECTIVE_SCALING);
assertEquals(1, event.getCount());
@@ -463,9 +465,26 @@ public class JobVertexScalerTest {
event = eventCollector.events.poll();
assertThat(event).isNotNull();
assertThat(event.getMessage())
- .isEqualTo(String.format(INEFFECTIVE_MESSAGE_FORMAT,
jobVertexID));
+ .isEqualTo(
+ String.format(
+ INEFFECTIVE_MESSAGE_FORMAT, jobVertexID, 90.0,
5.0, "enabled"));
assertThat(event.getReason()).isEqualTo(INEFFECTIVE_SCALING);
assertEquals(2, event.getCount());
+
+ // Test ineffective scaling switched off
+ conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED,
false);
+ assertEquals(
+ 40,
+ vertexScaler.computeScaleTargetParallelism(
+ context, jobVertexID, evaluated, history,
restartTime));
+ assertEquals(1, eventCollector.events.size());
+ event = eventCollector.events.poll();
+ assertThat(event).isNotNull();
+ assertThat(event.getMessage())
+ .isEqualTo(
+ String.format(
+ INEFFECTIVE_MESSAGE_FORMAT, jobVertexID, 90.0,
5.0, "disabled"));
+ assertThat(event.getReason()).isEqualTo(INEFFECTIVE_SCALING);
}
private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(