1996fanrui commented on code in PR #677:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1351975279
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -15,149 +15,180 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler.config;
+package org.apache.flink.autoscaler.config;
+import org.apache.flink.autoscaler.metrics.MetricAggregator;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
-import
org.apache.flink.kubernetes.operator.autoscaler.metrics.MetricAggregator;
import java.time.Duration;
import java.util.List;
-import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig;
-
/** Config options related to the autoscaler module. */
public class AutoScalerOptions {
+ public static final String DEPRECATED_K8S_OP_CONF_PREFIX =
"kubernetes.operator.";
+ public static final String AUTOSCALER_CONF_PREFIX = "job.autoscaler.";
+
+ private static String deprecatedOperatorConfigKey(String key) {
+ return DEPRECATED_K8S_OP_CONF_PREFIX + AUTOSCALER_CONF_PREFIX + key;
+ }
+
+ private static String autoScalerConfigKey(String key) {
+ return AUTOSCALER_CONF_PREFIX + key;
+ }
+
private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
- return operatorConfig("job.autoscaler." + key);
+ return ConfigOptions.key(autoScalerConfigKey(key));
}
public static final ConfigOption<Boolean> AUTOSCALER_ENABLED =
autoScalerConfig("enabled")
.booleanType()
.defaultValue(false)
+ .withDeprecatedKeys(deprecatedOperatorConfigKey("enabled"))
Review Comment:
Added:
> Note: The option prefix `kubernetes.operator.` was removed in FLIP-334,
because the autoscaler module was decoupled from flink-kubernetes-operator.
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static
org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism;
+import static
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism;
+
+/** The default implementation of {@link JobAutoScaler}. */
+public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>>
+ implements JobAutoScaler<KEY, Context> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JobAutoScalerImpl.class);
+
+ @VisibleForTesting protected static final String AUTOSCALER_ERROR =
"AutoscalerError";
Review Comment:
It's used at `BacklogBasedScalingTest`.
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingHistoryUtils.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.metrics;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import javax.annotation.Nonnull;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/** The utils for scaling history. */
+public class ScalingHistoryUtils {
+
+ public static <KEY, Context extends JobAutoScalerContext<KEY>> void
addToScalingHistoryAndStore(
+ AutoScalerStateStore<KEY, Context> stateStore,
+ Context context,
+ Instant now,
+ Map<JobVertexID, ScalingSummary> summaries)
+ throws Exception {
+ addToScalingHistoryAndStore(
+ stateStore,
+ context,
+ getTrimmedScalingHistory(stateStore, context, now),
+ now,
+ summaries);
+ }
+
+ public static <KEY, Context extends JobAutoScalerContext<KEY>> void
addToScalingHistoryAndStore(
+ AutoScalerStateStore<KEY, Context> stateStore,
+ Context context,
+ Map<JobVertexID, SortedMap<Instant, ScalingSummary>>
scalingHistory,
+ Instant now,
+ Map<JobVertexID, ScalingSummary> summaries)
+ throws Exception {
+
+ summaries.forEach(
+ (id, summary) ->
+ scalingHistory.computeIfAbsent(id, j -> new
TreeMap<>()).put(now, summary));
+ stateStore.storeScalingHistory(context, scalingHistory);
+ }
+
+ public static <KEY, Context extends JobAutoScalerContext<KEY>> void
updateVertexList(
+ AutoScalerStateStore<KEY, Context> stateStore,
+ Context ctx,
+ Instant now,
+ Set<JobVertexID> vertexSet)
+ throws Exception {
+ Map<JobVertexID, SortedMap<Instant, ScalingSummary>>
trimmedScalingHistory =
+ getTrimmedScalingHistory(stateStore, ctx, now);
+
+ if (trimmedScalingHistory.keySet().removeIf(v ->
!vertexSet.contains(v))) {
+ stateStore.storeScalingHistory(ctx, trimmedScalingHistory);
+ }
+ }
+
+ @Nonnull
+ public static <KEY, Context extends JobAutoScalerContext<KEY>>
+ Map<JobVertexID, SortedMap<Instant, ScalingSummary>>
getTrimmedScalingHistory(
+ AutoScalerStateStore<KEY, Context> autoScalerStateStore,
+ Context context,
+ Instant now)
+ throws Exception {
+ var conf = context.getConfiguration();
+ return autoScalerStateStore
+ .getScalingHistory(context)
+ .map(
+ scalingHistory -> {
+ var entryIt = scalingHistory.entrySet().iterator();
+ while (entryIt.hasNext()) {
+ var entry = entryIt.next();
+ // Limit how long past scaling decisions are
remembered
+ entry.setValue(
+ entry.getValue()
+ .tailMap(
+ now.minus(
+ conf.get(
+
AutoScalerOptions
+
.VERTEX_SCALING_HISTORY_AGE))));
Review Comment:
Sorry, I didn't understand this comment. I didn't change these code, they
are moved from `AutoScalerInfo`[1].
Do you mean this tailMap cannot be used directly? We should tail it, and
recreate a new TreeMap?
[1]
https://github.com/apache/flink-kubernetes-operator/blob/305498a9ab2e04ab71a4c2d87f2edb746373df1a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java#L130
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -166,6 +197,13 @@ private static ConfigOptions.OptionBuilder
autoScalerConfig(String key) {
.stringType()
.asList()
.defaultValues()
+
.withDeprecatedKeys(deprecatedOperatorConfigKey("vertex.exclude.ids"))
.withDescription(
"A (semicolon-separated) list of vertex ids in
hexstring for which to disable scaling. Caution: For non-sink vertices this
will still scale their downstream operators until
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
+
+ public static final ConfigOption<Duration> FLINK_CLIENT_TIMEOUT =
+ autoScalerConfig("flink.client.timeout")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(10))
+ .withDescription("The timeout for waiting the flink rest
client to return.");
Review Comment:
Didn't use the `client-timeout` of flink because it's used at flink client
process, such as: flink client start a job.
Here is flink rest client timeout, it uses
`org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions#OPERATOR_FLINK_CLIENT_TIMEOUT`
before this PR. And I call the
`conf.set(AutoScalerOptions.FLINK_CLIENT_TIMEOUT,
getOperatorConfig().getFlinkClientTimeout());` when createJobAutoScalerContext
in this PR.
This new option is similar with `OPERATOR_FLINK_CLIENT_TIME`, and the
default value is 10s. That's why adding a option here.
If it's not clear, how about rename it to `flink.rest-client.timeout`?
##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Tests for {@link RestApiMetricsCollector}. */
+public class RestApiMetricsCollectorTest {
+
+ @Test
+ public void testAggregateMultiplePendingRecordsMetricsPerSource() throws
Exception {
+ RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>> collector =
+ new RestApiMetricsCollector<>();
+
+ JobVertexID jobVertexID = new JobVertexID();
+ Map<String, FlinkMetric> flinkMetrics =
+ Map.of(
+ "a.pendingRecords", FlinkMetric.PENDING_RECORDS,
+ "b.pendingRecords", FlinkMetric.PENDING_RECORDS);
+ Map<JobVertexID, Map<String, FlinkMetric>> metrics =
Map.of(jobVertexID, flinkMetrics);
+
+ List<AggregatedMetric> aggregatedMetricsResponse =
+ List.of(
+ new AggregatedMetric(
+ "a.pendingRecords", Double.NaN, Double.NaN,
Double.NaN, 100.),
+ new AggregatedMetric(
+ "b.pendingRecords", Double.NaN, Double.NaN,
Double.NaN, 100.),
+ new AggregatedMetric(
+ "c.unrelated", Double.NaN, Double.NaN,
Double.NaN, 100.));
+
+ Configuration conf = new Configuration();
+ RestClusterClient<String> restClusterClient =
+ new RestClusterClient<>(
+ conf,
+ "test-cluster",
+ (c, e) -> new StandaloneClientHAServices("localhost"))
{
+ @Override
+ public <
+ M extends MessageHeaders<R, P, U>,
+ U extends MessageParameters,
+ R extends RequestBody,
+ P extends ResponseBody>
+ CompletableFuture<P> sendRequest(
+ M messageHeaders, U messageParameters, R
request) {
+ if (messageHeaders instanceof
AggregatedSubtaskMetricsHeaders) {
+ return (CompletableFuture<P>)
+ CompletableFuture.completedFuture(
+ new AggregatedMetricsResponseBody(
+
aggregatedMetricsResponse));
+ }
+ return (CompletableFuture<P>)
+
CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+ }
+ };
+
+ JobID jobID = new JobID();
+ JobAutoScalerContext<JobID> context =
+ new JobAutoScalerContext<>(
+ jobID,
+ jobID,
+ conf,
+ new UnregisteredMetricsGroup(),
+ () -> restClusterClient);
+
+ Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> jobVertexIDMapMap
=
+ collector.queryAllAggregatedMetrics(context, metrics);
+
+ System.out.println(jobVertexIDMapMap);
Review Comment:
Sorry, what's the `left-over` here? I just copy it from old code. 😂
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##########
@@ -154,7 +155,7 @@ private Map<ScalingMetric, EvaluatedScalingMetric>
evaluateMetrics(
}
@VisibleForTesting
- protected static void computeProcessingRateThresholds(
+ public static void computeProcessingRateThresholds(
Review Comment:
My mistake, revoked.
##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Tests for {@link RestApiMetricsCollector}. */
+public class RestApiMetricsCollectorTest {
+
+ @Test
+ public void testAggregateMultiplePendingRecordsMetricsPerSource() throws
Exception {
+ RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>> collector =
+ new RestApiMetricsCollector<>();
+
+ JobVertexID jobVertexID = new JobVertexID();
+ Map<String, FlinkMetric> flinkMetrics =
+ Map.of(
+ "a.pendingRecords", FlinkMetric.PENDING_RECORDS,
+ "b.pendingRecords", FlinkMetric.PENDING_RECORDS);
+ Map<JobVertexID, Map<String, FlinkMetric>> metrics =
Map.of(jobVertexID, flinkMetrics);
+
+ List<AggregatedMetric> aggregatedMetricsResponse =
+ List.of(
+ new AggregatedMetric(
+ "a.pendingRecords", Double.NaN, Double.NaN,
Double.NaN, 100.),
+ new AggregatedMetric(
+ "b.pendingRecords", Double.NaN, Double.NaN,
Double.NaN, 100.),
+ new AggregatedMetric(
+ "c.unrelated", Double.NaN, Double.NaN,
Double.NaN, 100.));
+
+ Configuration conf = new Configuration();
+ RestClusterClient<String> restClusterClient =
+ new RestClusterClient<>(
+ conf,
+ "test-cluster",
+ (c, e) -> new StandaloneClientHAServices("localhost"))
{
+ @Override
+ public <
+ M extends MessageHeaders<R, P, U>,
+ U extends MessageParameters,
+ R extends RequestBody,
+ P extends ResponseBody>
+ CompletableFuture<P> sendRequest(
+ M messageHeaders, U messageParameters, R
request) {
+ if (messageHeaders instanceof
AggregatedSubtaskMetricsHeaders) {
+ return (CompletableFuture<P>)
+ CompletableFuture.completedFuture(
+ new AggregatedMetricsResponseBody(
+
aggregatedMetricsResponse));
+ }
+ return (CompletableFuture<P>)
+
CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+ }
+ };
+
+ JobID jobID = new JobID();
+ JobAutoScalerContext<JobID> context =
+ new JobAutoScalerContext<>(
+ jobID,
+ jobID,
+ conf,
+ new UnregisteredMetricsGroup(),
+ () -> restClusterClient);
+
+ Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> jobVertexIDMapMap
=
+ collector.queryAllAggregatedMetrics(context, metrics);
+
+ System.out.println(jobVertexIDMapMap);
Review Comment:
Sorry, what's the `left-over` here? I just copy it from old code. 😂
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -38,42 +36,42 @@
import java.util.Map;
import java.util.SortedMap;
-import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
-import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
-import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
-import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
-import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
-import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
-import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
-import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
-import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
-import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static
org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static
org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
+import static
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static
org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static
org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static
org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static
org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static
org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
/** Component responsible for computing vertex parallelism based on the
scaling metrics. */
-public class JobVertexScaler {
+public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> {
private static final Logger LOG =
LoggerFactory.getLogger(JobVertexScaler.class);
+ @VisibleForTesting public static final String INEFFECTIVE_SCALING =
"IneffectiveScaling";
Review Comment:
It's used at `JobVertexScalerTest`, and I updated it to `protected`.
##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java:
##########
@@ -217,73 +198,38 @@ public void endToEnd() throws Exception {
now = now.plus(Duration.ofSeconds(10));
setClocksTo(now);
- restart(now);
-
- // after restart while the job is not running the evaluated metrics
are gone
- autoscaler.scale(getResourceContext(app, ctx));
- assertEquals(3, getOrCreateInfo(app,
kubernetesClient).getMetricHistory().size());
-
assertNull(autoscaler.lastEvaluatedMetrics.get(ResourceID.fromResource(app)));
- scaledParallelism =
ScalingExecutorTest.getScaledParallelism(kubernetesClient, app);
- assertEquals(4, scaledParallelism.get(source));
- assertEquals(4, scaledParallelism.get(sink));
-
- now = now.plus(Duration.ofSeconds(1));
- setClocksTo(now);
- running(now);
- // once the job is running we got back the evaluated metric except the
recommended
- // parallelisms (until the metric window is full again)
- autoscaler.scale(getResourceContext(app, ctx));
- assertEquals(1, getOrCreateInfo(app,
kubernetesClient).getMetricHistory().size());
+ autoscaler.scale(context);
+ assertEvaluatedMetricsSize(1);
assertEquals(4., getCurrentMetricValue(source, PARALLELISM));
assertEquals(4., getCurrentMetricValue(sink, PARALLELISM));
- assertNull(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM));
- assertNull(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM));
- scaledParallelism =
ScalingExecutorTest.getScaledParallelism(kubernetesClient, app);
+ assertEquals(4., getCurrentMetricValue(source,
RECOMMENDED_PARALLELISM));
+ assertEquals(4., getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM));
+ scaledParallelism =
ScalingExecutorTest.getScaledParallelism(stateStore, context);
assertEquals(4, scaledParallelism.get(source));
assertEquals(4, scaledParallelism.get(sink));
}
+ private void assertEvaluatedMetricsSize(int expectedSize) {
+ Optional<SortedMap<Instant, CollectedMetrics>> evaluatedMetricsOpt =
+ stateStore.getEvaluatedMetrics(context);
+ assertThat(evaluatedMetricsOpt).isPresent();
+ assertThat(evaluatedMetricsOpt.get()).hasSize(expectedSize);
+ }
+
private Double getCurrentMetricValue(JobVertexID jobVertexID,
ScalingMetric scalingMetric) {
var metric =
autoscaler
.lastEvaluatedMetrics
- .get(ResourceID.fromResource(app))
+ .get(context.getJobKey())
.get(jobVertexID)
.get(scalingMetric);
return metric == null ? null : metric.getCurrent();
}
- private void restart(Instant now) {
- metricsCollector.setJobUpdateTs(now);
- app.getStatus().getJobStatus().setState(JobStatus.CREATED.name());
- }
-
- private void running(Instant now) {
- metricsCollector.setJobUpdateTs(now);
- app.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
- }
-
Review Comment:
This comment is a same question with the previous comment. I reply it
together.
The reason is the `waitingForRunning` logic is moved from `autoscaler`
module to
`org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler#waitingForRunning`[1].
This logic is not a part of generic `autoscaler` now. I will move this test
to `flink-kubernetes-operator` module later.
[1]
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1341955135
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]