This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit c34fe4f41a2f684da01d84ab0d761ec669af83f6
Author: Rui Fan <[email protected]>
AuthorDate: Fri Jan 5 18:39:45 2024 +0800

    [FLINK-33450][autoscaler] Extract the AbstractAutoScalerStateStoreTest and 
add the InMemoryAutoScalerStateStoreTest
---
 .../state/AbstractAutoScalerStateStoreTest.java    | 203 +++++++++++++++++++++
 .../state/InMemoryAutoScalerStateStoreTest.java    |  46 +++++
 flink-kubernetes-operator/pom.xml                  |   9 +
 .../state/KubernetesAutoScalerStateStoreTest.java  | 181 +++---------------
 4 files changed, 284 insertions(+), 155 deletions(-)

diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
new file mode 100644
index 00000000..dcab0c4c
--- /dev/null
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.state;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static 
org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore;
+import static 
org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingHistory;
+import static 
org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.updateVertexList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Abstract ITCase for {@link AutoScalerStateStore}. */
+public abstract class AbstractAutoScalerStateStoreTest<
+        KEY, Context extends JobAutoScalerContext<KEY>> {
+
+    protected AutoScalerStateStore<KEY, Context> stateStore;
+
+    protected Context ctx;
+
+    /** Create autoscaler state store based on the physical storage. */
+    protected abstract AutoScalerStateStore<KEY, Context> 
createPhysicalAutoScalerStateStore()
+            throws Exception;
+
+    /**
+     * Create autoscaler state store based on the cache if the state store has 
cache. It creates
+     * physical state store by default.
+     */
+    protected AutoScalerStateStore<KEY, Context> 
createCachedAutoScalerStateStore()
+            throws Exception {
+        return createPhysicalAutoScalerStateStore();
+    }
+
+    protected abstract Context createJobContext();
+
+    /** Some preliminary setup before abstract class setup. */
+    protected void preSetup() throws Exception {}
+
+    @BeforeEach
+    void setup() throws Exception {
+        preSetup();
+        stateStore = createCachedAutoScalerStateStore();
+        ctx = createJobContext();
+    }
+
+    @Test
+    void testTopologyUpdate() throws Exception {
+        var v1 = new JobVertexID();
+        var v2 = new JobVertexID();
+        var v3 = new JobVertexID();
+
+        var summaries = new HashMap<JobVertexID, ScalingSummary>();
+        summaries.put(v1, new ScalingSummary(1, 2, null));
+        summaries.put(v2, new ScalingSummary(1, 2, null));
+
+        var now = Instant.now();
+
+        addToScalingHistoryAndStore(stateStore, ctx, now, summaries);
+        stateStore.flush(ctx);
+
+        Assertions.assertEquals(
+                summaries.keySet(), getTrimmedScalingHistory(stateStore, ctx, 
now).keySet());
+        Assertions.assertEquals(
+                summaries.keySet(),
+                getTrimmedScalingHistory(createCachedAutoScalerStateStore(), 
ctx, now).keySet());
+        Assertions.assertEquals(
+                summaries.keySet(),
+                getTrimmedScalingHistory(createPhysicalAutoScalerStateStore(), 
ctx, now).keySet());
+
+        updateVertexList(stateStore, ctx, now, Set.of(v2, v3));
+        stateStore.flush(ctx);
+
+        // Expect v1 to be removed
+        Assertions.assertEquals(
+                Set.of(v2), getTrimmedScalingHistory(stateStore, ctx, 
now).keySet());
+        Assertions.assertEquals(
+                Set.of(v2),
+                getTrimmedScalingHistory(createCachedAutoScalerStateStore(), 
ctx, now).keySet());
+        Assertions.assertEquals(
+                Set.of(v2),
+                getTrimmedScalingHistory(createPhysicalAutoScalerStateStore(), 
ctx, now).keySet());
+    }
+
+    @Test
+    void testHistorySizeConfigs() throws Exception {
+        var v1 = new JobVertexID();
+
+        var history = new HashMap<JobVertexID, ScalingSummary>();
+        history.put(v1, new ScalingSummary(1, 2, null));
+
+        var conf = ctx.getConfiguration();
+        conf.set(AutoScalerOptions.VERTEX_SCALING_HISTORY_COUNT, 2);
+        conf.set(AutoScalerOptions.VERTEX_SCALING_HISTORY_AGE, 
Duration.ofSeconds(10));
+
+        var now = Instant.now();
+
+        // Verify count based expiration
+        addToScalingHistoryAndStore(stateStore, ctx, now, history);
+        Assertions.assertEquals(1, getTrimmedScalingHistory(stateStore, ctx, 
now).get(v1).size());
+
+        addToScalingHistoryAndStore(stateStore, ctx, 
now.plus(Duration.ofSeconds(1)), history);
+        addToScalingHistoryAndStore(stateStore, ctx, 
now.plus(Duration.ofSeconds(2)), history);
+
+        Assertions.assertEquals(
+                2,
+                getTrimmedScalingHistory(stateStore, ctx, 
now.plus(Duration.ofSeconds(2)))
+                        .get(v1)
+                        .size());
+        Assertions.assertEquals(
+                Set.of(now.plus(Duration.ofSeconds(1)), 
now.plus(Duration.ofSeconds(2))),
+                getTrimmedScalingHistory(stateStore, ctx, 
now.plus(Duration.ofSeconds(2)))
+                        .get(v1)
+                        .keySet());
+
+        // Verify time based expiration
+        addToScalingHistoryAndStore(stateStore, ctx, 
now.plus(Duration.ofSeconds(15)), history);
+        stateStore.flush(ctx);
+
+        Assertions.assertEquals(
+                1,
+                getTrimmedScalingHistory(stateStore, ctx, 
now.plus(Duration.ofSeconds(15)))
+                        .get(v1)
+                        .size());
+        Assertions.assertEquals(
+                Set.of(now.plus(Duration.ofSeconds(15))),
+                getTrimmedScalingHistory(stateStore, ctx, 
now.plus(Duration.ofSeconds(15)))
+                        .get(v1)
+                        .keySet());
+        Assertions.assertEquals(
+                Set.of(now.plus(Duration.ofSeconds(15))),
+                getTrimmedScalingHistory(
+                                createCachedAutoScalerStateStore(),
+                                ctx,
+                                now.plus(Duration.ofSeconds(15)))
+                        .get(v1)
+                        .keySet());
+        Assertions.assertEquals(
+                Set.of(now.plus(Duration.ofSeconds(15))),
+                getTrimmedScalingHistory(
+                                createPhysicalAutoScalerStateStore(),
+                                ctx,
+                                now.plus(Duration.ofSeconds(15)))
+                        .get(v1)
+                        .keySet());
+    }
+
+    @Test
+    protected void testDiscardAllState() throws Exception {
+        stateStore.storeCollectedMetrics(
+                ctx, new TreeMap<>(Map.of(Instant.now(), new 
CollectedMetrics())));
+        stateStore.storeScalingHistory(
+                ctx,
+                Map.of(
+                        new JobVertexID(),
+                        new TreeMap<>(Map.of(Instant.now(), new 
ScalingSummary()))));
+        stateStore.storeParallelismOverrides(ctx, Map.of(new 
JobVertexID().toHexString(), "23"));
+
+        assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty();
+        assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty();
+        assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty();
+
+        stateStore.flush(ctx);
+
+        assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty();
+        assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty();
+        assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty();
+
+        stateStore.clearAll(ctx);
+
+        assertThat(stateStore.getCollectedMetrics(ctx)).isEmpty();
+        assertThat(stateStore.getScalingHistory(ctx)).isEmpty();
+        assertThat(stateStore.getParallelismOverrides(ctx)).isEmpty();
+    }
+}
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStoreTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStoreTest.java
new file mode 100644
index 00000000..a0bfde9f
--- /dev/null
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStoreTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import static 
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
+
+/** Test for {@link InMemoryAutoScalerStateStore}. */
+class InMemoryAutoScalerStateStoreTest
+        extends AbstractAutoScalerStateStoreTest<JobID, 
JobAutoScalerContext<JobID>> {
+
+    private InMemoryAutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> 
stateStore;
+
+    @Override
+    protected void preSetup() {
+        stateStore = new InMemoryAutoScalerStateStore<>();
+    }
+
+    @Override
+    protected AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>>
+            createPhysicalAutoScalerStateStore() {
+        return stateStore;
+    }
+
+    @Override
+    protected JobAutoScalerContext<JobID> createJobContext() {
+        return createDefaultJobAutoScalerContext();
+    }
+}
diff --git a/flink-kubernetes-operator/pom.xml 
b/flink-kubernetes-operator/pom.xml
index ebdb2fbc..02e739af 100644
--- a/flink-kubernetes-operator/pom.xml
+++ b/flink-kubernetes-operator/pom.xml
@@ -153,6 +153,15 @@ under the License.
         </dependency>
 
         <!-- Test -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-autoscaler</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.awaitility</groupId>
             <artifactId>awaitility</artifactId>
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStoreTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStoreTest.java
index da7aef48..b34eb785 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStoreTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStoreTest.java
@@ -18,32 +18,30 @@
 package org.apache.flink.kubernetes.operator.autoscaler.state;
 
 import org.apache.flink.autoscaler.ScalingSummary;
-import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.metrics.CollectedMetrics;
 import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.state.AbstractAutoScalerStateStoreTest;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
 import 
org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.time.Duration;
 import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
-import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
 import static 
org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore;
 import static 
org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingHistory;
-import static 
org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.updateVertexList;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.TestingKubernetesAutoscalerUtils.createContext;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore.serializeEvaluatedMetrics;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore.serializeScalingHistory;
@@ -53,141 +51,37 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Test for {@link KubernetesAutoScalerStateStore}. */
 @EnableKubernetesMockClient(crud = true)
-public class KubernetesAutoScalerStateStoreTest {
+public class KubernetesAutoScalerStateStoreTest
+        extends AbstractAutoScalerStateStoreTest<ResourceID, 
KubernetesJobAutoScalerContext> {
 
     KubernetesClient kubernetesClient;
 
     ConfigMapStore configMapStore;
 
-    KubernetesAutoScalerStateStore stateStore;
-
-    KubernetesJobAutoScalerContext ctx;
-
-    @BeforeEach
-    void setup() {
-        configMapStore = new ConfigMapStore(kubernetesClient);
-        stateStore = new KubernetesAutoScalerStateStore(configMapStore);
-        ctx = createContext("cr1", kubernetesClient);
+    @Override
+    protected AutoScalerStateStore<ResourceID, KubernetesJobAutoScalerContext>
+            createPhysicalAutoScalerStateStore() {
+        return new KubernetesAutoScalerStateStore(new 
ConfigMapStore(kubernetesClient));
     }
 
-    @Test
-    public void testTopologyUpdate() throws Exception {
-        var v1 = new JobVertexID();
-        var v2 = new JobVertexID();
-        var v3 = new JobVertexID();
-
-        var summaries = new HashMap<JobVertexID, ScalingSummary>();
-        summaries.put(v1, new ScalingSummary(1, 2, null));
-        summaries.put(v2, new ScalingSummary(1, 2, null));
-
-        var now = Instant.now();
-
-        addToScalingHistoryAndStore(stateStore, ctx, now, summaries);
-        stateStore.flush(ctx);
-
-        Assertions.assertEquals(
-                summaries.keySet(), getTrimmedScalingHistory(stateStore, ctx, 
now).keySet());
-        Assertions.assertEquals(
-                summaries.keySet(),
-                getTrimmedScalingHistory(
-                                new 
KubernetesAutoScalerStateStore(configMapStore), ctx, now)
-                        .keySet());
-        Assertions.assertEquals(
-                summaries.keySet(),
-                getTrimmedScalingHistory(
-                                new KubernetesAutoScalerStateStore(
-                                        new ConfigMapStore(kubernetesClient)),
-                                ctx,
-                                now)
-                        .keySet());
-
-        updateVertexList(stateStore, ctx, now, Set.of(v2, v3));
-        stateStore.flush(ctx);
-
-        // Expect v1 to be removed
-        Assertions.assertEquals(
-                Set.of(v2), getTrimmedScalingHistory(stateStore, ctx, 
now).keySet());
-        Assertions.assertEquals(
-                Set.of(v2),
-                getTrimmedScalingHistory(
-                                new 
KubernetesAutoScalerStateStore(configMapStore), ctx, now)
-                        .keySet());
-        Assertions.assertEquals(
-                Set.of(v2),
-                getTrimmedScalingHistory(
-                                new KubernetesAutoScalerStateStore(
-                                        new ConfigMapStore(kubernetesClient)),
-                                ctx,
-                                now)
-                        .keySet());
+    @Override
+    protected AutoScalerStateStore<ResourceID, KubernetesJobAutoScalerContext>
+            createCachedAutoScalerStateStore() {
+        return new KubernetesAutoScalerStateStore(configMapStore);
     }
 
-    @Test
-    public void testHistorySizeConfigs() throws Exception {
-        var v1 = new JobVertexID();
-
-        var history = new HashMap<JobVertexID, ScalingSummary>();
-        history.put(v1, new ScalingSummary(1, 2, null));
-
-        var conf = ctx.getConfiguration();
-        conf.set(AutoScalerOptions.VERTEX_SCALING_HISTORY_COUNT, 2);
-        conf.set(AutoScalerOptions.VERTEX_SCALING_HISTORY_AGE, 
Duration.ofSeconds(10));
-
-        var now = Instant.now();
-
-        // Verify count based expiration
-        addToScalingHistoryAndStore(stateStore, ctx, now, history);
-        Assertions.assertEquals(1, getTrimmedScalingHistory(stateStore, ctx, 
now).get(v1).size());
-
-        addToScalingHistoryAndStore(stateStore, ctx, 
now.plus(Duration.ofSeconds(1)), history);
-        addToScalingHistoryAndStore(stateStore, ctx, 
now.plus(Duration.ofSeconds(2)), history);
-
-        Assertions.assertEquals(
-                2,
-                getTrimmedScalingHistory(stateStore, ctx, 
now.plus(Duration.ofSeconds(2)))
-                        .get(v1)
-                        .size());
-        Assertions.assertEquals(
-                Set.of(now.plus(Duration.ofSeconds(1)), 
now.plus(Duration.ofSeconds(2))),
-                getTrimmedScalingHistory(stateStore, ctx, 
now.plus(Duration.ofSeconds(2)))
-                        .get(v1)
-                        .keySet());
-
-        // Verify time based expiration
-        addToScalingHistoryAndStore(stateStore, ctx, 
now.plus(Duration.ofSeconds(15)), history);
-        stateStore.flush(ctx);
+    @Override
+    protected KubernetesJobAutoScalerContext createJobContext() {
+        return createContext("cr1", kubernetesClient);
+    }
 
-        Assertions.assertEquals(
-                1,
-                getTrimmedScalingHistory(stateStore, ctx, 
now.plus(Duration.ofSeconds(15)))
-                        .get(v1)
-                        .size());
-        Assertions.assertEquals(
-                Set.of(now.plus(Duration.ofSeconds(15))),
-                getTrimmedScalingHistory(stateStore, ctx, 
now.plus(Duration.ofSeconds(15)))
-                        .get(v1)
-                        .keySet());
-        Assertions.assertEquals(
-                Set.of(now.plus(Duration.ofSeconds(15))),
-                getTrimmedScalingHistory(
-                                new 
KubernetesAutoScalerStateStore(configMapStore),
-                                ctx,
-                                now.plus(Duration.ofSeconds(15)))
-                        .get(v1)
-                        .keySet());
-        Assertions.assertEquals(
-                Set.of(now.plus(Duration.ofSeconds(15))),
-                getTrimmedScalingHistory(
-                                new KubernetesAutoScalerStateStore(
-                                        new ConfigMapStore(kubernetesClient)),
-                                ctx,
-                                now.plus(Duration.ofSeconds(15)))
-                        .get(v1)
-                        .keySet());
+    @Override
+    protected void preSetup() {
+        configMapStore = new ConfigMapStore(kubernetesClient);
     }
 
     @Test
-    public void testCompressionMigration() throws Exception {
+    void testCompressionMigration() throws Exception {
         var jobUpdateTs = Instant.now();
         var v1 = new JobVertexID();
 
@@ -235,7 +129,7 @@ public class KubernetesAutoScalerStateStoreTest {
     }
 
     @Test
-    public void testMetricsTrimming() throws Exception {
+    void testMetricsTrimming() throws Exception {
         var v1 = new JobVertexID();
         Random rnd = new Random();
 
@@ -287,7 +181,7 @@ public class KubernetesAutoScalerStateStoreTest {
                                         .length()
                         < KubernetesAutoScalerStateStore.MAX_CM_BYTES);
 
-        stateStore.trimHistoryToMaxCmSize(ctx);
+        ((KubernetesAutoScalerStateStore) 
stateStore).trimHistoryToMaxCmSize(ctx);
         assertTrue(
                 configMapStore
                                         .getSerializedState(
@@ -306,7 +200,7 @@ public class KubernetesAutoScalerStateStoreTest {
     }
 
     @Test
-    public void testDiscardInvalidHistory() throws Exception {
+    void testDiscardInvalidHistory() throws Exception {
         configMapStore.putSerializedState(
                 ctx, KubernetesAutoScalerStateStore.COLLECTED_METRICS_KEY, 
"invalid");
         configMapStore.putSerializedState(
@@ -336,31 +230,8 @@ public class KubernetesAutoScalerStateStoreTest {
     }
 
     @Test
-    public void testDiscardAllState() {
-        stateStore.storeCollectedMetrics(
-                ctx, new TreeMap<>(Map.of(Instant.now(), new 
CollectedMetrics())));
-        stateStore.storeScalingHistory(
-                ctx,
-                Map.of(
-                        new JobVertexID(),
-                        new TreeMap<>(Map.of(Instant.now(), new 
ScalingSummary()))));
-        stateStore.storeParallelismOverrides(ctx, Map.of(new 
JobVertexID().toHexString(), "23"));
-
-        assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty();
-        assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty();
-        assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty();
-
-        stateStore.flush(ctx);
-
-        assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty();
-        assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty();
-        assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty();
-
-        stateStore.clearAll(ctx);
-
-        assertThat(stateStore.getCollectedMetrics(ctx)).isEmpty();
-        assertThat(stateStore.getScalingHistory(ctx)).isEmpty();
-        assertThat(stateStore.getParallelismOverrides(ctx)).isEmpty();
+    protected void testDiscardAllState() throws Exception {
+        super.testDiscardAllState();
 
         // We haven't flushed the clear operation, ConfigMap in Kubernetes 
should not be empty
         
assertThat(configMapStore.getConfigMapFromKubernetes(ctx).getDataReadOnly()).isNotEmpty();

Reply via email to