This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 02840b96 [FLINK-33527] Simplify state store cleanup logic (#710)
02840b96 is described below
commit 02840b96ef3116ea95a440af4f945398900d89df
Author: Maximilian Michels <[email protected]>
AuthorDate: Wed Nov 15 15:16:41 2023 +0100
[FLINK-33527] Simplify state store cleanup logic (#710)
---
.../apache/flink/autoscaler/JobAutoScalerImpl.java | 38 +------
.../autoscaler/state/AutoScalerStateStore.java | 9 +-
.../state/InMemoryAutoScalerStateStore.java | 7 ++
.../flink/autoscaler/BacklogBasedScalingTest.java | 9 +-
.../flink/autoscaler/JobAutoScalerImplTest.java | 71 +++---------
.../autoscaler/RecommendedParallelismTest.java | 9 +-
.../state/TestingAutoscalerStateStore.java | 37 ------
.../operator/autoscaler/AutoscalerFactory.java | 2 +
.../autoscaler/{ => state}/ConfigMapStore.java | 99 +++++-----------
.../operator/autoscaler/state/ConfigMapView.java | 117 +++++++++++++++++++
.../KubernetesAutoScalerStateStore.java | 8 +-
.../KubernetesAutoScalerEventHandlerTest.java | 2 +
.../autoscaler/{ => state}/ConfigMapStoreTest.java | 126 ++++++++++++++++++---
.../KubernetesAutoScalerStateStoreTest.java | 43 ++++++-
14 files changed, 351 insertions(+), 226 deletions(-)
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index 0e2afb55..1cfdba62 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -42,7 +42,6 @@ import java.util.concurrent.ConcurrentHashMap;
import static
org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
import static
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism;
import static
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism;
-import static
org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.trimScalingHistory;
/** The default implementation of {@link JobAutoScaler}. */
public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>>
@@ -90,7 +89,8 @@ public class JobAutoScalerImpl<KEY, Context extends
JobAutoScalerContext<KEY>>
try {
if (!ctx.getConfiguration().getBoolean(AUTOSCALER_ENABLED)) {
LOG.debug("Autoscaler is disabled");
- clearStatesAfterAutoscalerDisabled(ctx);
+ stateStore.clearAll(ctx);
+ stateStore.flush(ctx);
return;
}
@@ -120,40 +120,6 @@ public class JobAutoScalerImpl<KEY, Context extends
JobAutoScalerContext<KEY>>
stateStore.removeInfoFromCache(jobKey);
}
- private void clearStatesAfterAutoscalerDisabled(Context ctx) throws
Exception {
- var needFlush = false;
- var parallelismOverrides = stateStore.getParallelismOverrides(ctx);
- if (!parallelismOverrides.isEmpty()) {
- needFlush = true;
- stateStore.removeParallelismOverrides(ctx);
- }
-
- var collectedMetrics = stateStore.getCollectedMetrics(ctx);
- if (!collectedMetrics.isEmpty()) {
- needFlush = true;
- stateStore.removeCollectedMetrics(ctx);
- }
-
- var scalingHistory = stateStore.getScalingHistory(ctx);
- if (!scalingHistory.isEmpty()) {
- var trimmedScalingHistory =
- trimScalingHistory(clock.instant(),
ctx.getConfiguration(), scalingHistory);
- if (trimmedScalingHistory.isEmpty()) {
- // All scaling histories are trimmed.
- needFlush = true;
- stateStore.removeScalingHistory(ctx);
- } else if (!scalingHistory.equals(trimmedScalingHistory)) {
- // Some scaling histories are trimmed.
- needFlush = true;
- stateStore.storeScalingHistory(ctx, trimmedScalingHistory);
- }
- }
-
- if (needFlush) {
- stateStore.flush(ctx);
- }
- }
-
@VisibleForTesting
protected Map<String, String> getParallelismOverrides(Context ctx) throws
Exception {
return stateStore.getParallelismOverrides(ctx);
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
index bb3329a4..6f9e54e2 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
@@ -64,10 +64,13 @@ public interface AutoScalerStateStore<KEY, Context extends
JobAutoScalerContext<
void removeParallelismOverrides(Context jobContext) throws Exception;
+ /** Removes all data from this context. Flush stil needs to be called. */
+ void clearAll(Context jobContext) throws Exception;
+
/**
- * Flushing is needed because we just save data in cache for all store
methods. For less write
- * operations, we flush the cached data to the physical storage only after
all operations have
- * been performed.
+ * Flushing is needed because we do not persist data for all store methods
until this method is
+ * called. Note: The state store implementation should try to avoid write
operations unless data
+ * was changed through this interface.
*/
void flush(Context jobContext) throws Exception;
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
index 29176365..caf4e4fd 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
@@ -105,6 +105,13 @@ public class InMemoryAutoScalerStateStore<KEY, Context
extends JobAutoScalerCont
parallelismOverridesStore.remove(jobContext.getJobKey());
}
+ @Override
+ public void clearAll(Context jobContext) {
+ scalingHistoryStore.remove(jobContext.getJobKey());
+ parallelismOverridesStore.remove(jobContext.getJobKey());
+ collectedMetricsStore.remove(jobContext.getJobKey());
+ }
+
@Override
public void flush(Context jobContext) {
// The InMemory state store doesn't persist data.
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
index 810bffe3..f3198188 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
@@ -24,7 +24,8 @@ import
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.metrics.FlinkMetric;
import org.apache.flink.autoscaler.realizer.TestingScalingRealizer;
-import org.apache.flink.autoscaler.state.TestingAutoscalerStateStore;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -54,7 +55,7 @@ public class BacklogBasedScalingTest {
private JobAutoScalerContext<JobID> context;
private TestingEventCollector<JobID, JobAutoScalerContext<JobID>>
eventCollector;
- private TestingAutoscalerStateStore<JobID, JobAutoScalerContext<JobID>>
stateStore;
+ private AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>>
stateStore;
private TestingMetricsCollector<JobID, JobAutoScalerContext<JobID>>
metricsCollector;
private ScalingExecutor<JobID, JobAutoScalerContext<JobID>>
scalingExecutor;
@@ -68,7 +69,7 @@ public class BacklogBasedScalingTest {
context = createDefaultJobAutoScalerContext();
eventCollector = new TestingEventCollector<>();
- stateStore = new TestingAutoscalerStateStore<>();
+ stateStore = new InMemoryAutoScalerStateStore<>();
scalingExecutor = new ScalingExecutor<>(eventCollector, stateStore);
@@ -395,7 +396,7 @@ public class BacklogBasedScalingTest {
assertTrue(eventCollector.events.isEmpty());
}
- private void assertEvaluatedMetricsSize(int expectedSize) {
+ private void assertEvaluatedMetricsSize(int expectedSize) throws Exception
{
SortedMap<Instant, CollectedMetrics> evaluatedMetrics =
stateStore.getCollectedMetrics(context);
assertThat(evaluatedMetrics).hasSize(expectedSize);
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
index 7f507dd4..a553c1b7 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
@@ -26,7 +26,8 @@ import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.metrics.FlinkMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.realizer.TestingScalingRealizer;
-import org.apache.flink.autoscaler.state.TestingAutoscalerStateStore;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.client.program.rest.RestClusterClient;
@@ -43,16 +44,13 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
-import java.time.ZoneId;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import java.util.SortedMap;
import java.util.TreeMap;
import static
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
@@ -61,6 +59,8 @@ import static
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABL
import static
org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_SCALING_HISTORY_AGE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/** Tests for JobAutoScalerImpl. */
public class JobAutoScalerImplTest {
@@ -68,7 +68,7 @@ public class JobAutoScalerImplTest {
private JobAutoScalerContext<JobID> context;
private TestingScalingRealizer<JobID, JobAutoScalerContext<JobID>>
scalingRealizer;
private TestingEventCollector<JobID, JobAutoScalerContext<JobID>>
eventCollector;
- private TestingAutoscalerStateStore<JobID, JobAutoScalerContext<JobID>>
stateStore;
+ private AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>>
stateStore;
@BeforeEach
public void setup() {
@@ -77,7 +77,7 @@ public class JobAutoScalerImplTest {
scalingRealizer = new TestingScalingRealizer<>();
eventCollector = new TestingEventCollector<>();
- stateStore = new TestingAutoscalerStateStore<>();
+ stateStore = new InMemoryAutoScalerStateStore<>();
}
@Test
@@ -209,10 +209,8 @@ public class JobAutoScalerImplTest {
assertThat(autoscaler.getParallelismOverrides(context)).isEmpty();
assertParallelismOverrides(null);
- int requestCount = stateStore.getFlushCount();
// Make sure we don't update in kubernetes once removed
autoscaler.scale(context);
- assertEquals(requestCount, stateStore.getFlushCount());
context.getConfiguration().setString(AUTOSCALER_ENABLED.key(), "true");
autoscaler.applyParallelismOverrides(context);
@@ -299,60 +297,25 @@ public class JobAutoScalerImplTest {
scalingHistory.put(Instant.ofEpochMilli(100), new ScalingSummary());
scalingHistory.put(Instant.ofEpochMilli(200), new ScalingSummary());
- // Test all scaling aren't expired
- getInstantScalingSummaryTreeMap(
- scalingHistory, Clock.fixed(Instant.ofEpochMilli(250),
ZoneId.systemDefault()), 2);
+ stateStore.storeScalingHistory(context, Map.of(new JobVertexID(),
scalingHistory));
+ assertFalse(stateStore.getScalingHistory(context).isEmpty());
- // Test one scaling aren't expired
- getInstantScalingSummaryTreeMap(
- scalingHistory, Clock.fixed(Instant.ofEpochMilli(350),
ZoneId.systemDefault()), 1);
+ stateStore.storeParallelismOverrides(context, Map.of("vertex", "4"));
+ assertFalse(stateStore.getParallelismOverrides(context).isEmpty());
- // Test all scaling are expired
- getInstantScalingSummaryTreeMap(
- scalingHistory, Clock.fixed(Instant.ofEpochMilli(450),
ZoneId.systemDefault()), 0);
- }
+ TreeMap<Instant, CollectedMetrics> metrics = new TreeMap<>();
+ metrics.put(Instant.now(), new CollectedMetrics());
+ stateStore.storeCollectedMetrics(context, metrics);
+ assertFalse(stateStore.getCollectedMetrics(context).isEmpty());
- private void getInstantScalingSummaryTreeMap(
- SortedMap<Instant, ScalingSummary> scalingHistoryData,
- Clock clock,
- int expectedScalingHistorySize)
- throws Exception {
- stateStore = new TestingAutoscalerStateStore<>();
var autoscaler =
new JobAutoScalerImpl<>(
null, null, null, eventCollector, scalingRealizer,
stateStore);
-
- enrichStateStore(scalingHistoryData);
- stateStore.flush(context);
- assertThat(stateStore.getFlushCount()).isEqualTo(1);
-
- autoscaler.setClock(clock);
autoscaler.scale(context);
- assertThat(stateStore.getParallelismOverrides(context)).isEmpty();
- assertThat(stateStore.getCollectedMetrics(context)).isEmpty();
-
- if (expectedScalingHistorySize > 0) {
- Map<JobVertexID, SortedMap<Instant, ScalingSummary>>
scalingHistory =
- stateStore.getScalingHistory(context);
- assertThat(scalingHistory).isNotEmpty();
- assertThat(scalingHistory.values())
- .allMatch(aa -> aa.size() == expectedScalingHistorySize);
- } else {
- assertThat(stateStore.getScalingHistory(context)).isEmpty();
- }
- assertThat(stateStore.getFlushCount()).isEqualTo(2);
- }
-
- private void enrichStateStore(SortedMap<Instant, ScalingSummary>
scalingHistory) {
- var v1 = new JobVertexID();
- var v2 = new JobVertexID();
- stateStore.storeParallelismOverrides(
- context, Map.of(v1.toString(), "1", v2.toString(), "2"));
-
- var metricHistory = new TreeMap<Instant, CollectedMetrics>();
- stateStore.storeCollectedMetrics(context, metricHistory);
- stateStore.storeScalingHistory(context, Map.of(v1, scalingHistory, v2,
scalingHistory));
+ assertTrue(stateStore.getScalingHistory(context).isEmpty());
+ assertTrue(stateStore.getScalingHistory(context).isEmpty());
+ assertTrue(stateStore.getParallelismOverrides(context).isEmpty());
}
private void assertParallelismOverrides(Map<String, String>
expectedOverrides) {
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
index 1884268e..87025b93 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
@@ -25,7 +25,8 @@ import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.metrics.FlinkMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.realizer.TestingScalingRealizer;
-import org.apache.flink.autoscaler.state.TestingAutoscalerStateStore;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -55,7 +56,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class RecommendedParallelismTest {
private JobAutoScalerContext<JobID> context;
- private TestingAutoscalerStateStore<JobID, JobAutoScalerContext<JobID>>
stateStore;
+ private AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>>
stateStore;
private TestingMetricsCollector<JobID, JobAutoScalerContext<JobID>>
metricsCollector;
private ScalingExecutor<JobID, JobAutoScalerContext<JobID>>
scalingExecutor;
@@ -70,7 +71,7 @@ public class RecommendedParallelismTest {
TestingEventCollector<JobID, JobAutoScalerContext<JobID>>
eventCollector =
new TestingEventCollector<>();
- stateStore = new TestingAutoscalerStateStore<>();
+ stateStore = new InMemoryAutoScalerStateStore<>();
scalingExecutor = new ScalingExecutor<>(eventCollector, stateStore);
@@ -227,7 +228,7 @@ public class RecommendedParallelismTest {
assertEquals(4, scaledParallelism.get(sink));
}
- private void assertEvaluatedMetricsSize(int expectedSize) {
+ private void assertEvaluatedMetricsSize(int expectedSize) throws Exception
{
SortedMap<Instant, CollectedMetrics> evaluatedMetrics =
stateStore.getCollectedMetrics(context);
assertThat(evaluatedMetrics).hasSize(expectedSize);
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/TestingAutoscalerStateStore.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/TestingAutoscalerStateStore.java
deleted file mode 100644
index 3b3fafa1..00000000
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/TestingAutoscalerStateStore.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.autoscaler.state;
-
-import org.apache.flink.autoscaler.JobAutoScalerContext;
-
-/** Testing {@link AutoScalerStateStore} implementation. */
-public class TestingAutoscalerStateStore<KEY, Context extends
JobAutoScalerContext<KEY>>
- extends InMemoryAutoScalerStateStore<KEY, Context> {
-
- private int flushCount;
-
- @Override
- public void flush(Context jobContext) {
- super.flush(jobContext);
- flushCount++;
- }
-
- public int getFlushCount() {
- return flushCount;
- }
-}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java
index 97e70267..dea6df0b 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java
@@ -22,6 +22,8 @@ import org.apache.flink.autoscaler.JobAutoScalerImpl;
import org.apache.flink.autoscaler.RestApiMetricsCollector;
import org.apache.flink.autoscaler.ScalingExecutor;
import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.kubernetes.operator.autoscaler.state.ConfigMapStore;
+import
org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import io.fabric8.kubernetes.client.KubernetesClient;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStore.java
similarity index 52%
rename from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java
rename to
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStore.java
index 5ded9719..b840a292 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStore.java
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.kubernetes.operator.autoscaler.state;
import org.apache.flink.annotation.VisibleForTesting;
+import
org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import org.apache.flink.kubernetes.utils.Constants;
import io.fabric8.kubernetes.api.model.ConfigMap;
@@ -28,8 +29,6 @@ import
io.javaoperatorsdk.operator.processing.event.ResourceID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nonnull;
-
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -44,13 +43,12 @@ public class ConfigMapStore {
private final KubernetesClient kubernetesClient;
- // The cache for each resourceId may be in three states:
- // 1. The resourceId doesn't exist : ConfigMap isn't loaded from
kubernetes, or it's deleted
- // 2 Exists, Optional.empty() : The ConfigMap doesn't exist in Kubernetes
- // 3. Exists, Not Empty : We have loaded the ConfigMap from kubernetes, it
may not be the same
- // if not flushed already
- private final ConcurrentHashMap<ResourceID, Optional<ConfigMap>> cache =
- new ConcurrentHashMap<>();
+ /**
+ * Cache for Kubernetes ConfigMaps which reflects the latest state of a
ConfigMap for a
+ * ResourceId. Any changes to the ConfigMap are only reflected in
Kubernetes once the flush()
+ * method is called.
+ */
+ private final ConcurrentHashMap<ResourceID, ConfigMapView> cache = new
ConcurrentHashMap<>();
public ConfigMapStore(KubernetesClient kubernetesClient) {
this.kubernetesClient = kubernetesClient;
@@ -58,34 +56,30 @@ public class ConfigMapStore {
protected void putSerializedState(
KubernetesJobAutoScalerContext jobContext, String key, String
value) {
- getOrCreateState(jobContext).put(key, value);
+ getConfigMap(jobContext).put(key, value);
}
protected Optional<String> getSerializedState(
KubernetesJobAutoScalerContext jobContext, String key) {
- return getConfigMap(jobContext).map(configMap ->
configMap.getData().get(key));
+ return Optional.ofNullable(getConfigMap(jobContext).get(key));
}
protected void removeSerializedState(KubernetesJobAutoScalerContext
jobContext, String key) {
- getConfigMap(jobContext)
- .ifPresentOrElse(
- configMap -> configMap.getData().remove(key),
- () -> {
- throw new IllegalStateException(
- "The configMap isn't created, so the
remove is unavailable.");
- });
+ getConfigMap(jobContext).removeKey(key);
+ }
+
+ public void clearAll(KubernetesJobAutoScalerContext jobContext) {
+ getConfigMap(jobContext).clear();
}
public void flush(KubernetesJobAutoScalerContext jobContext) {
- Optional<ConfigMap> configMapOpt = cache.get(jobContext.getJobKey());
- if (configMapOpt == null || configMapOpt.isEmpty()) {
- LOG.debug("The configMap isn't updated, so skip the flush.");
+ ConfigMapView configMapView = cache.get(jobContext.getJobKey());
+ if (configMapView == null) {
+ LOG.debug("The configMap doesn't exist, so skip the flush.");
return;
}
try {
- cache.put(
- jobContext.getJobKey(),
-
Optional.of(kubernetesClient.resource(configMapOpt.get()).update()));
+ configMapView.flush();
} catch (Exception e) {
LOG.error(
"Error while updating autoscaler info configmap,
invalidating to clear the cache",
@@ -99,49 +93,17 @@ public class ConfigMapStore {
cache.remove(resourceID);
}
- private Optional<ConfigMap> getConfigMap(KubernetesJobAutoScalerContext
jobContext) {
+ private ConfigMapView getConfigMap(KubernetesJobAutoScalerContext
jobContext) {
return cache.computeIfAbsent(
jobContext.getJobKey(), (id) ->
getConfigMapFromKubernetes(jobContext));
}
- private Map<String, String>
getOrCreateState(KubernetesJobAutoScalerContext jobContext) {
- return cache.compute(
- jobContext.getJobKey(),
- (id, configMapOpt) -> {
- // If in the cache and valid simply return
- if (configMapOpt != null &&
configMapOpt.isPresent()) {
- return configMapOpt;
- }
- // Otherwise get or create
- return
Optional.of(getOrCreateConfigMapFromKubernetes(jobContext));
- })
- .get()
- .getData();
- }
-
@VisibleForTesting
- protected Optional<ConfigMap> getConfigMapFromKubernetes(
- KubernetesJobAutoScalerContext jobContext) {
+ ConfigMapView getConfigMapFromKubernetes(KubernetesJobAutoScalerContext
jobContext) {
HasMetadata cr = jobContext.getResource();
var meta = createCmObjectMeta(ResourceID.fromResource(cr));
- return getScalingInfoConfigMap(meta);
- }
-
- @Nonnull
- private ConfigMap getOrCreateConfigMapFromKubernetes(
- KubernetesJobAutoScalerContext jobContext) {
- HasMetadata cr = jobContext.getResource();
- var meta = createCmObjectMeta(ResourceID.fromResource(cr));
- return getScalingInfoConfigMap(meta).orElseGet(() ->
createConfigMap(cr, meta));
- }
-
- private ConfigMap createConfigMap(HasMetadata cr, ObjectMeta meta) {
- LOG.info("Creating scaling info config map");
- var cm = new ConfigMap();
- cm.setMetadata(meta);
- cm.addOwnerReference(cr);
- cm.setData(new HashMap<>());
- return kubernetesClient.resource(cm).create();
+ var configMapSkeleton = buildConfigMap(cr, meta);
+ return new ConfigMapView(configMapSkeleton,
kubernetesClient::resource);
}
private ObjectMeta createCmObjectMeta(ResourceID uid) {
@@ -157,17 +119,16 @@ public class ConfigMapStore {
return objectMeta;
}
- private Optional<ConfigMap> getScalingInfoConfigMap(ObjectMeta objectMeta)
{
- return Optional.ofNullable(
- kubernetesClient
- .configMaps()
- .inNamespace(objectMeta.getNamespace())
- .withName(objectMeta.getName())
- .get());
+ private ConfigMap buildConfigMap(HasMetadata cr, ObjectMeta meta) {
+ var cm = new ConfigMap();
+ cm.setMetadata(meta);
+ cm.addOwnerReference(cr);
+ cm.setData(new HashMap<>());
+ return cm;
}
@VisibleForTesting
- protected ConcurrentHashMap<ResourceID, Optional<ConfigMap>> getCache() {
+ protected ConcurrentHashMap<ResourceID, ConfigMapView> getCache() {
return cache;
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapView.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapView.java
new file mode 100644
index 00000000..8a6a37c6
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapView.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.client.dsl.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Function;
+
+class ConfigMapView {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ConfigMapView.class);
+
+ enum State {
+ /** ConfigMap is only stored locally, not created in Kubernetes yet. */
+ NEEDS_CREATE,
+ /** ConfigMap exists in Kubernetes but there are newer local changes.
*/
+ NEEDS_UPDATE,
+ /** ConfigMap view reflects the actual contents of Kubernetes
ConfigMap. */
+ UP_TO_DATE
+ }
+
+ private State state;
+
+ private ConfigMap configMap;
+
+ private final Function<ConfigMap, Resource<ConfigMap>> resourceRetriever;
+
+ public ConfigMapView(
+ ConfigMap configMapSkeleton,
+ Function<ConfigMap, Resource<ConfigMap>> resourceRetriever) {
+ var existingConfigMap =
resourceRetriever.apply(configMapSkeleton).get();
+ if (existingConfigMap != null) {
+ refreshConfigMap(existingConfigMap);
+ } else {
+ this.configMap = configMapSkeleton;
+ this.state = State.NEEDS_CREATE;
+ }
+ this.resourceRetriever = resourceRetriever;
+ }
+
+ public String get(String key) {
+ return configMap.getData().get(key);
+ }
+
+ public void put(String key, String value) {
+ configMap.getData().put(key, value);
+ requireUpdate();
+ }
+
+ public void removeKey(String key) {
+ var oldKey = configMap.getData().remove(key);
+ if (oldKey != null) {
+ requireUpdate();
+ }
+ }
+
+ public void clear() {
+ if (configMap.getData().isEmpty()) {
+ return;
+ }
+ configMap.getData().clear();
+ requireUpdate();
+ }
+
+ public void flush() {
+ if (state == State.UP_TO_DATE) {
+ return;
+ }
+ Resource<ConfigMap> resource = resourceRetriever.apply(configMap);
+ if (state == State.NEEDS_UPDATE) {
+ refreshConfigMap(resource.update());
+ } else if (state == State.NEEDS_CREATE) {
+ LOG.info("Creating config map {}",
configMap.getMetadata().getName());
+ refreshConfigMap(resource.create());
+ }
+ }
+
+ private void refreshConfigMap(ConfigMap configMap) {
+ Preconditions.checkNotNull(configMap);
+ this.configMap = configMap;
+ this.state = State.UP_TO_DATE;
+ }
+
+ private void requireUpdate() {
+ if (state != State.NEEDS_CREATE) {
+ state = State.NEEDS_UPDATE;
+ }
+ }
+
+ @VisibleForTesting
+ public Map<String, String> getDataReadOnly() {
+ return Collections.unmodifiableMap(configMap.getData());
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
similarity index 97%
rename from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java
rename to
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
index 27e3083f..0e2ce1c6 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.kubernetes.operator.autoscaler.state;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.ScalingSummary;
@@ -23,6 +23,7 @@ import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule;
import org.apache.flink.configuration.ConfigurationUtils;
+import
org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import com.fasterxml.jackson.core.JacksonException;
@@ -163,6 +164,11 @@ public class KubernetesAutoScalerStateStore
configMapStore.removeSerializedState(jobContext,
PARALLELISM_OVERRIDES_KEY);
}
+ @Override
+ public void clearAll(KubernetesJobAutoScalerContext jobContext) {
+ configMapStore.clearAll(jobContext);
+ }
+
@Override
public void flush(KubernetesJobAutoScalerContext jobContext) {
trimHistoryToMaxCmSize(jobContext);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandlerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandlerTest.java
index bed6a6ab..ccee852c 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandlerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandlerTest.java
@@ -21,6 +21,8 @@ import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.state.ConfigMapStore;
+import
org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore;
import org.apache.flink.kubernetes.operator.utils.EventCollector;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.runtime.jobgraph.JobVertexID;
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStoreTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStoreTest.java
similarity index 51%
rename from
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStoreTest.java
rename to
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStoreTest.java
index 0e42799e..9c6b8fb2 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStoreTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/ConfigMapStoreTest.java
@@ -15,19 +15,20 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.kubernetes.operator.autoscaler.state;
+import
org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.mock.Whitebox;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.Optional;
-
import static
org.apache.flink.kubernetes.operator.autoscaler.TestingKubernetesAutoscalerUtils.createContext;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -41,12 +42,21 @@ public class ConfigMapStoreTest {
KubernetesMockServer mockWebServer;
+ ConfigMapStore configMapStore;
+
+ KubernetesJobAutoScalerContext ctx;
+
+ @BeforeEach
+ public void setup() {
+ configMapStore = new ConfigMapStore(kubernetesClient);
+ ctx = createContext("cr1", kubernetesClient);
+ }
+
@Test
void testCaching() {
KubernetesJobAutoScalerContext ctx1 = createContext("cr1",
kubernetesClient);
KubernetesJobAutoScalerContext ctx2 = createContext("cr2",
kubernetesClient);
- var configMapStore = new ConfigMapStore(kubernetesClient);
assertEquals(0, mockWebServer.getRequestCount());
String key1 = "key1";
@@ -56,6 +66,7 @@ public class ConfigMapStoreTest {
String key3 = "key3";
String value3 = "value3";
assertThat(configMapStore.getSerializedState(ctx1, key1)).isEmpty();
+ // Retrieve configMap
assertEquals(1, mockWebServer.getRequestCount());
// Further gets should not go to K8s
@@ -63,18 +74,24 @@ public class ConfigMapStoreTest {
assertThat(configMapStore.getSerializedState(ctx1, key3)).isEmpty();
assertEquals(1, mockWebServer.getRequestCount());
- assertThat(configMapStore.getConfigMapFromKubernetes(ctx1)).isEmpty();
+ // Manually trigger retrieval from Kubernetes
+
assertThat(configMapStore.getConfigMapFromKubernetes(ctx1).getDataReadOnly()).isEmpty();
assertEquals(2, mockWebServer.getRequestCount());
+ // Putting does not go to Kubernetes, unless flushing.
configMapStore.putSerializedState(ctx1, key1, value1);
- assertEquals(4, mockWebServer.getRequestCount());
+ assertEquals(2, mockWebServer.getRequestCount());
// The put just update the data to cache, and shouldn't request
kubernetes.
configMapStore.putSerializedState(ctx1, key2, value2);
- assertEquals(4, mockWebServer.getRequestCount());
+ assertEquals(2, mockWebServer.getRequestCount());
-
assertThat(configMapStore.getConfigMapFromKubernetes(ctx1)).isPresent();
- assertThat(configMapStore.getConfigMapFromKubernetes(ctx2)).isEmpty();
+ // Flush!
+ configMapStore.flush(ctx1);
+ assertEquals(3, mockWebServer.getRequestCount());
+
+
assertThat(configMapStore.getConfigMapFromKubernetes(ctx1).getDataReadOnly()).isNotEmpty();
+
assertThat(configMapStore.getConfigMapFromKubernetes(ctx2).getDataReadOnly()).isEmpty();
assertThat(configMapStore.getSerializedState(ctx1, key1)).isPresent();
assertThat(configMapStore.getSerializedState(ctx2, key1)).isEmpty();
@@ -97,21 +114,23 @@ public class ConfigMapStoreTest {
void testErrorHandling() {
KubernetesJobAutoScalerContext ctx = createContext("cr1",
kubernetesClient);
- var configMapStore = new ConfigMapStore(kubernetesClient);
// Test for the invalid flush.
configMapStore.flush(ctx);
assertEquals(0, mockWebServer.getRequestCount());
configMapStore.putSerializedState(ctx, "a", "1");
- Optional<ConfigMap> configMapOpt =
configMapStore.getCache().get(ctx.getJobKey());
- assertThat(configMapOpt).isPresent();
- assertEquals(2, mockWebServer.getRequestCount());
+ ConfigMap configMap =
+ (ConfigMap)
+ Whitebox.getInternalState(
+
configMapStore.getCache().get(ctx.getJobKey()), "configMap");
+ assertThat(configMap.getData()).isNotEmpty();
+ assertEquals(1, mockWebServer.getRequestCount());
// Modify the autoscaler info in the background
- var cm = ReconciliationUtils.clone(configMapOpt.get());
+ var cm = ReconciliationUtils.clone(configMap);
cm.getData().put("a", "2");
- kubernetesClient.resource(cm).update();
+ kubernetesClient.resource(cm).create();
// Replace should throw an error due to the modification
assertThrows(KubernetesClientException.class, () ->
configMapStore.flush(ctx));
@@ -120,4 +139,81 @@ public class ConfigMapStoreTest {
// Make sure we can get the new version
assertThat(configMapStore.getSerializedState(ctx, "a")).contains("2");
}
+
+ @Test
+ void testMinimalAmountOfFlushing() {
+ KubernetesJobAutoScalerContext ctx = createContext("cr1",
kubernetesClient);
+ var key = "key";
+ var value = "value";
+
+ configMapStore.getSerializedState(ctx, key);
+ assertEquals(1, mockWebServer.getRequestCount());
+
+ configMapStore.putSerializedState(ctx, key, value);
+ assertEquals(1, mockWebServer.getRequestCount());
+
+ configMapStore.flush(ctx);
+ assertEquals(2, mockWebServer.getRequestCount());
+
+ // Get from cache
+ assertThat(configMapStore.getSerializedState(ctx,
key)).hasValue(value);
+ assertEquals(2, mockWebServer.getRequestCount());
+
+ configMapStore.removeSerializedState(ctx, key);
+ assertEquals(2, mockWebServer.getRequestCount());
+
+ configMapStore.flush(ctx);
+ assertEquals(3, mockWebServer.getRequestCount());
+
+ // Subsequent flushes do not trigger an API call
+ configMapStore.flush(ctx);
+ assertEquals(3, mockWebServer.getRequestCount());
+ }
+
+ @Test
+ public void testDiscardAllState() {
+ configMapStore.putSerializedState(
+ ctx, KubernetesAutoScalerStateStore.COLLECTED_METRICS_KEY,
"state1");
+ configMapStore.putSerializedState(
+ ctx, KubernetesAutoScalerStateStore.SCALING_HISTORY_KEY,
"state2");
+ configMapStore.putSerializedState(
+ ctx, KubernetesAutoScalerStateStore.PARALLELISM_OVERRIDES_KEY,
"state3");
+
+ assertThat(
+ configMapStore.getSerializedState(
+ ctx,
KubernetesAutoScalerStateStore.COLLECTED_METRICS_KEY))
+ .isPresent();
+ assertThat(
+ configMapStore.getSerializedState(
+ ctx,
KubernetesAutoScalerStateStore.SCALING_HISTORY_KEY))
+ .isPresent();
+ assertThat(
+ configMapStore.getSerializedState(
+ ctx,
KubernetesAutoScalerStateStore.PARALLELISM_OVERRIDES_KEY))
+ .isPresent();
+
+ configMapStore.flush(ctx);
+
+ configMapStore.clearAll(ctx);
+
+ assertThat(
+ configMapStore.getSerializedState(
+ ctx,
KubernetesAutoScalerStateStore.COLLECTED_METRICS_KEY))
+ .isEmpty();
+ assertThat(
+ configMapStore.getSerializedState(
+ ctx,
KubernetesAutoScalerStateStore.SCALING_HISTORY_KEY))
+ .isEmpty();
+ assertThat(
+ configMapStore.getSerializedState(
+ ctx,
KubernetesAutoScalerStateStore.PARALLELISM_OVERRIDES_KEY))
+ .isEmpty();
+
+ // We haven't flushed the clear operation, ConfigMap in Kubernetes
should not be empty
+
assertThat(configMapStore.getConfigMapFromKubernetes(ctx).getDataReadOnly()).isNotEmpty();
+
+ configMapStore.flush(ctx);
+
+
assertThat(configMapStore.getConfigMapFromKubernetes(ctx).getDataReadOnly()).isEmpty();
+ }
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStoreTest.java
similarity index 87%
rename from
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java
rename to
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStoreTest.java
index f4faaa55..561e5448 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStoreTest.java
@@ -15,13 +15,14 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.kubernetes.operator.autoscaler.state;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import
org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -43,9 +44,9 @@ import java.util.TreeMap;
import static
org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore;
import static
org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingHistory;
import static
org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.updateVertexList;
-import static
org.apache.flink.kubernetes.operator.autoscaler.KubernetesAutoScalerStateStore.serializeEvaluatedMetrics;
-import static
org.apache.flink.kubernetes.operator.autoscaler.KubernetesAutoScalerStateStore.serializeScalingHistory;
import static
org.apache.flink.kubernetes.operator.autoscaler.TestingKubernetesAutoscalerUtils.createContext;
+import static
org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore.serializeEvaluatedMetrics;
+import static
org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore.serializeScalingHistory;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -330,4 +331,40 @@ public class KubernetesAutoScalerStateStoreTest {
ctx,
KubernetesAutoScalerStateStore.SCALING_HISTORY_KEY))
.isEmpty();
}
+
+ @Test
+ public void testDiscardAllState() {
+ stateStore.storeCollectedMetrics(
+ ctx, new TreeMap<>(Map.of(Instant.now(), new
CollectedMetrics())));
+ stateStore.storeScalingHistory(
+ ctx,
+ Map.of(
+ new JobVertexID(),
+ new TreeMap<>(Map.of(Instant.now(), new
ScalingSummary()))));
+ stateStore.storeParallelismOverrides(ctx, Map.of(new
JobVertexID().toHexString(), "23"));
+
+ assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty();
+ assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty();
+ assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty();
+
+ stateStore.flush(ctx);
+
+ assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty();
+ assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty();
+ assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty();
+
+ stateStore.clearAll(ctx);
+
+ assertThat(stateStore.getCollectedMetrics(ctx)).isEmpty();
+ assertThat(stateStore.getScalingHistory(ctx)).isEmpty();
+ assertThat(stateStore.getParallelismOverrides(ctx)).isEmpty();
+
+ // We haven't flushed the clear operation, ConfigMap in Kubernetes
should not be empty
+
assertThat(configMapStore.getConfigMapFromKubernetes(ctx).getDataReadOnly()).isNotEmpty();
+
+ stateStore.flush(ctx);
+
+ // Contents should be removed from Kubernetes
+
assertThat(configMapStore.getConfigMapFromKubernetes(ctx).getDataReadOnly()).isEmpty();
+ }
}