This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit c34fe4f41a2f684da01d84ab0d761ec669af83f6 Author: Rui Fan <[email protected]> AuthorDate: Fri Jan 5 18:39:45 2024 +0800 [FLINK-33450][autoscaler] Extract the AbstractAutoScalerStateStoreTest and add the InMemoryAutoScalerStateStoreTest --- .../state/AbstractAutoScalerStateStoreTest.java | 203 +++++++++++++++++++++ .../state/InMemoryAutoScalerStateStoreTest.java | 46 +++++ flink-kubernetes-operator/pom.xml | 9 + .../state/KubernetesAutoScalerStateStoreTest.java | 181 +++--------------- 4 files changed, 284 insertions(+), 155 deletions(-) diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java new file mode 100644 index 00000000..dcab0c4c --- /dev/null +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/AbstractAutoScalerStateStoreTest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.state; + +import org.apache.flink.autoscaler.JobAutoScalerContext; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore; +import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingHistory; +import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.updateVertexList; +import static org.assertj.core.api.Assertions.assertThat; + +/** Abstract ITCase for {@link AutoScalerStateStore}. */ +public abstract class AbstractAutoScalerStateStoreTest< + KEY, Context extends JobAutoScalerContext<KEY>> { + + protected AutoScalerStateStore<KEY, Context> stateStore; + + protected Context ctx; + + /** Create autoscaler state store based on the physical storage. */ + protected abstract AutoScalerStateStore<KEY, Context> createPhysicalAutoScalerStateStore() + throws Exception; + + /** + * Create autoscaler state store based on the cache if the state store has cache. It creates + * physical state store by default. + */ + protected AutoScalerStateStore<KEY, Context> createCachedAutoScalerStateStore() + throws Exception { + return createPhysicalAutoScalerStateStore(); + } + + protected abstract Context createJobContext(); + + /** Some preliminary setup before abstract class setup. */ + protected void preSetup() throws Exception {} + + @BeforeEach + void setup() throws Exception { + preSetup(); + stateStore = createCachedAutoScalerStateStore(); + ctx = createJobContext(); + } + + @Test + void testTopologyUpdate() throws Exception { + var v1 = new JobVertexID(); + var v2 = new JobVertexID(); + var v3 = new JobVertexID(); + + var summaries = new HashMap<JobVertexID, ScalingSummary>(); + summaries.put(v1, new ScalingSummary(1, 2, null)); + summaries.put(v2, new ScalingSummary(1, 2, null)); + + var now = Instant.now(); + + addToScalingHistoryAndStore(stateStore, ctx, now, summaries); + stateStore.flush(ctx); + + Assertions.assertEquals( + summaries.keySet(), getTrimmedScalingHistory(stateStore, ctx, now).keySet()); + Assertions.assertEquals( + summaries.keySet(), + getTrimmedScalingHistory(createCachedAutoScalerStateStore(), ctx, now).keySet()); + Assertions.assertEquals( + summaries.keySet(), + getTrimmedScalingHistory(createPhysicalAutoScalerStateStore(), ctx, now).keySet()); + + updateVertexList(stateStore, ctx, now, Set.of(v2, v3)); + stateStore.flush(ctx); + + // Expect v1 to be removed + Assertions.assertEquals( + Set.of(v2), getTrimmedScalingHistory(stateStore, ctx, now).keySet()); + Assertions.assertEquals( + Set.of(v2), + getTrimmedScalingHistory(createCachedAutoScalerStateStore(), ctx, now).keySet()); + Assertions.assertEquals( + Set.of(v2), + getTrimmedScalingHistory(createPhysicalAutoScalerStateStore(), ctx, now).keySet()); + } + + @Test + void testHistorySizeConfigs() throws Exception { + var v1 = new JobVertexID(); + + var history = new HashMap<JobVertexID, ScalingSummary>(); + history.put(v1, new ScalingSummary(1, 2, null)); + + var conf = ctx.getConfiguration(); + conf.set(AutoScalerOptions.VERTEX_SCALING_HISTORY_COUNT, 2); + conf.set(AutoScalerOptions.VERTEX_SCALING_HISTORY_AGE, Duration.ofSeconds(10)); + + var now = Instant.now(); + + // Verify count based expiration + addToScalingHistoryAndStore(stateStore, ctx, now, history); + Assertions.assertEquals(1, getTrimmedScalingHistory(stateStore, ctx, now).get(v1).size()); + + addToScalingHistoryAndStore(stateStore, ctx, now.plus(Duration.ofSeconds(1)), history); + addToScalingHistoryAndStore(stateStore, ctx, now.plus(Duration.ofSeconds(2)), history); + + Assertions.assertEquals( + 2, + getTrimmedScalingHistory(stateStore, ctx, now.plus(Duration.ofSeconds(2))) + .get(v1) + .size()); + Assertions.assertEquals( + Set.of(now.plus(Duration.ofSeconds(1)), now.plus(Duration.ofSeconds(2))), + getTrimmedScalingHistory(stateStore, ctx, now.plus(Duration.ofSeconds(2))) + .get(v1) + .keySet()); + + // Verify time based expiration + addToScalingHistoryAndStore(stateStore, ctx, now.plus(Duration.ofSeconds(15)), history); + stateStore.flush(ctx); + + Assertions.assertEquals( + 1, + getTrimmedScalingHistory(stateStore, ctx, now.plus(Duration.ofSeconds(15))) + .get(v1) + .size()); + Assertions.assertEquals( + Set.of(now.plus(Duration.ofSeconds(15))), + getTrimmedScalingHistory(stateStore, ctx, now.plus(Duration.ofSeconds(15))) + .get(v1) + .keySet()); + Assertions.assertEquals( + Set.of(now.plus(Duration.ofSeconds(15))), + getTrimmedScalingHistory( + createCachedAutoScalerStateStore(), + ctx, + now.plus(Duration.ofSeconds(15))) + .get(v1) + .keySet()); + Assertions.assertEquals( + Set.of(now.plus(Duration.ofSeconds(15))), + getTrimmedScalingHistory( + createPhysicalAutoScalerStateStore(), + ctx, + now.plus(Duration.ofSeconds(15))) + .get(v1) + .keySet()); + } + + @Test + protected void testDiscardAllState() throws Exception { + stateStore.storeCollectedMetrics( + ctx, new TreeMap<>(Map.of(Instant.now(), new CollectedMetrics()))); + stateStore.storeScalingHistory( + ctx, + Map.of( + new JobVertexID(), + new TreeMap<>(Map.of(Instant.now(), new ScalingSummary())))); + stateStore.storeParallelismOverrides(ctx, Map.of(new JobVertexID().toHexString(), "23")); + + assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty(); + assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty(); + assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty(); + + stateStore.flush(ctx); + + assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty(); + assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty(); + assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty(); + + stateStore.clearAll(ctx); + + assertThat(stateStore.getCollectedMetrics(ctx)).isEmpty(); + assertThat(stateStore.getScalingHistory(ctx)).isEmpty(); + assertThat(stateStore.getParallelismOverrides(ctx)).isEmpty(); + } +} diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStoreTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStoreTest.java new file mode 100644 index 00000000..a0bfde9f --- /dev/null +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStoreTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.JobAutoScalerContext; + +import static org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext; + +/** Test for {@link InMemoryAutoScalerStateStore}. */ +class InMemoryAutoScalerStateStoreTest + extends AbstractAutoScalerStateStoreTest<JobID, JobAutoScalerContext<JobID>> { + + private InMemoryAutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> stateStore; + + @Override + protected void preSetup() { + stateStore = new InMemoryAutoScalerStateStore<>(); + } + + @Override + protected AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> + createPhysicalAutoScalerStateStore() { + return stateStore; + } + + @Override + protected JobAutoScalerContext<JobID> createJobContext() { + return createDefaultJobAutoScalerContext(); + } +} diff --git a/flink-kubernetes-operator/pom.xml b/flink-kubernetes-operator/pom.xml index ebdb2fbc..02e739af 100644 --- a/flink-kubernetes-operator/pom.xml +++ b/flink-kubernetes-operator/pom.xml @@ -153,6 +153,15 @@ under the License. </dependency> <!-- Test --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-autoscaler</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.awaitility</groupId> <artifactId>awaitility</artifactId> diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStoreTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStoreTest.java index da7aef48..b34eb785 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStoreTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStoreTest.java @@ -18,32 +18,30 @@ package org.apache.flink.kubernetes.operator.autoscaler.state; import org.apache.flink.autoscaler.ScalingSummary; -import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.autoscaler.metrics.CollectedMetrics; import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.state.AbstractAutoScalerStateStoreTest; +import org.apache.flink.autoscaler.state.AutoScalerStateStore; import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext; import org.apache.flink.runtime.jobgraph.JobVertexID; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; +import io.javaoperatorsdk.operator.processing.event.ResourceID; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.time.Duration; import java.time.Instant; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Random; -import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore; import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingHistory; -import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.updateVertexList; import static org.apache.flink.kubernetes.operator.autoscaler.TestingKubernetesAutoscalerUtils.createContext; import static org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore.serializeEvaluatedMetrics; import static org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore.serializeScalingHistory; @@ -53,141 +51,37 @@ import static org.junit.jupiter.api.Assertions.assertTrue; /** Test for {@link KubernetesAutoScalerStateStore}. */ @EnableKubernetesMockClient(crud = true) -public class KubernetesAutoScalerStateStoreTest { +public class KubernetesAutoScalerStateStoreTest + extends AbstractAutoScalerStateStoreTest<ResourceID, KubernetesJobAutoScalerContext> { KubernetesClient kubernetesClient; ConfigMapStore configMapStore; - KubernetesAutoScalerStateStore stateStore; - - KubernetesJobAutoScalerContext ctx; - - @BeforeEach - void setup() { - configMapStore = new ConfigMapStore(kubernetesClient); - stateStore = new KubernetesAutoScalerStateStore(configMapStore); - ctx = createContext("cr1", kubernetesClient); + @Override + protected AutoScalerStateStore<ResourceID, KubernetesJobAutoScalerContext> + createPhysicalAutoScalerStateStore() { + return new KubernetesAutoScalerStateStore(new ConfigMapStore(kubernetesClient)); } - @Test - public void testTopologyUpdate() throws Exception { - var v1 = new JobVertexID(); - var v2 = new JobVertexID(); - var v3 = new JobVertexID(); - - var summaries = new HashMap<JobVertexID, ScalingSummary>(); - summaries.put(v1, new ScalingSummary(1, 2, null)); - summaries.put(v2, new ScalingSummary(1, 2, null)); - - var now = Instant.now(); - - addToScalingHistoryAndStore(stateStore, ctx, now, summaries); - stateStore.flush(ctx); - - Assertions.assertEquals( - summaries.keySet(), getTrimmedScalingHistory(stateStore, ctx, now).keySet()); - Assertions.assertEquals( - summaries.keySet(), - getTrimmedScalingHistory( - new KubernetesAutoScalerStateStore(configMapStore), ctx, now) - .keySet()); - Assertions.assertEquals( - summaries.keySet(), - getTrimmedScalingHistory( - new KubernetesAutoScalerStateStore( - new ConfigMapStore(kubernetesClient)), - ctx, - now) - .keySet()); - - updateVertexList(stateStore, ctx, now, Set.of(v2, v3)); - stateStore.flush(ctx); - - // Expect v1 to be removed - Assertions.assertEquals( - Set.of(v2), getTrimmedScalingHistory(stateStore, ctx, now).keySet()); - Assertions.assertEquals( - Set.of(v2), - getTrimmedScalingHistory( - new KubernetesAutoScalerStateStore(configMapStore), ctx, now) - .keySet()); - Assertions.assertEquals( - Set.of(v2), - getTrimmedScalingHistory( - new KubernetesAutoScalerStateStore( - new ConfigMapStore(kubernetesClient)), - ctx, - now) - .keySet()); + @Override + protected AutoScalerStateStore<ResourceID, KubernetesJobAutoScalerContext> + createCachedAutoScalerStateStore() { + return new KubernetesAutoScalerStateStore(configMapStore); } - @Test - public void testHistorySizeConfigs() throws Exception { - var v1 = new JobVertexID(); - - var history = new HashMap<JobVertexID, ScalingSummary>(); - history.put(v1, new ScalingSummary(1, 2, null)); - - var conf = ctx.getConfiguration(); - conf.set(AutoScalerOptions.VERTEX_SCALING_HISTORY_COUNT, 2); - conf.set(AutoScalerOptions.VERTEX_SCALING_HISTORY_AGE, Duration.ofSeconds(10)); - - var now = Instant.now(); - - // Verify count based expiration - addToScalingHistoryAndStore(stateStore, ctx, now, history); - Assertions.assertEquals(1, getTrimmedScalingHistory(stateStore, ctx, now).get(v1).size()); - - addToScalingHistoryAndStore(stateStore, ctx, now.plus(Duration.ofSeconds(1)), history); - addToScalingHistoryAndStore(stateStore, ctx, now.plus(Duration.ofSeconds(2)), history); - - Assertions.assertEquals( - 2, - getTrimmedScalingHistory(stateStore, ctx, now.plus(Duration.ofSeconds(2))) - .get(v1) - .size()); - Assertions.assertEquals( - Set.of(now.plus(Duration.ofSeconds(1)), now.plus(Duration.ofSeconds(2))), - getTrimmedScalingHistory(stateStore, ctx, now.plus(Duration.ofSeconds(2))) - .get(v1) - .keySet()); - - // Verify time based expiration - addToScalingHistoryAndStore(stateStore, ctx, now.plus(Duration.ofSeconds(15)), history); - stateStore.flush(ctx); + @Override + protected KubernetesJobAutoScalerContext createJobContext() { + return createContext("cr1", kubernetesClient); + } - Assertions.assertEquals( - 1, - getTrimmedScalingHistory(stateStore, ctx, now.plus(Duration.ofSeconds(15))) - .get(v1) - .size()); - Assertions.assertEquals( - Set.of(now.plus(Duration.ofSeconds(15))), - getTrimmedScalingHistory(stateStore, ctx, now.plus(Duration.ofSeconds(15))) - .get(v1) - .keySet()); - Assertions.assertEquals( - Set.of(now.plus(Duration.ofSeconds(15))), - getTrimmedScalingHistory( - new KubernetesAutoScalerStateStore(configMapStore), - ctx, - now.plus(Duration.ofSeconds(15))) - .get(v1) - .keySet()); - Assertions.assertEquals( - Set.of(now.plus(Duration.ofSeconds(15))), - getTrimmedScalingHistory( - new KubernetesAutoScalerStateStore( - new ConfigMapStore(kubernetesClient)), - ctx, - now.plus(Duration.ofSeconds(15))) - .get(v1) - .keySet()); + @Override + protected void preSetup() { + configMapStore = new ConfigMapStore(kubernetesClient); } @Test - public void testCompressionMigration() throws Exception { + void testCompressionMigration() throws Exception { var jobUpdateTs = Instant.now(); var v1 = new JobVertexID(); @@ -235,7 +129,7 @@ public class KubernetesAutoScalerStateStoreTest { } @Test - public void testMetricsTrimming() throws Exception { + void testMetricsTrimming() throws Exception { var v1 = new JobVertexID(); Random rnd = new Random(); @@ -287,7 +181,7 @@ public class KubernetesAutoScalerStateStoreTest { .length() < KubernetesAutoScalerStateStore.MAX_CM_BYTES); - stateStore.trimHistoryToMaxCmSize(ctx); + ((KubernetesAutoScalerStateStore) stateStore).trimHistoryToMaxCmSize(ctx); assertTrue( configMapStore .getSerializedState( @@ -306,7 +200,7 @@ public class KubernetesAutoScalerStateStoreTest { } @Test - public void testDiscardInvalidHistory() throws Exception { + void testDiscardInvalidHistory() throws Exception { configMapStore.putSerializedState( ctx, KubernetesAutoScalerStateStore.COLLECTED_METRICS_KEY, "invalid"); configMapStore.putSerializedState( @@ -336,31 +230,8 @@ public class KubernetesAutoScalerStateStoreTest { } @Test - public void testDiscardAllState() { - stateStore.storeCollectedMetrics( - ctx, new TreeMap<>(Map.of(Instant.now(), new CollectedMetrics()))); - stateStore.storeScalingHistory( - ctx, - Map.of( - new JobVertexID(), - new TreeMap<>(Map.of(Instant.now(), new ScalingSummary())))); - stateStore.storeParallelismOverrides(ctx, Map.of(new JobVertexID().toHexString(), "23")); - - assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty(); - assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty(); - assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty(); - - stateStore.flush(ctx); - - assertThat(stateStore.getCollectedMetrics(ctx)).isNotEmpty(); - assertThat(stateStore.getScalingHistory(ctx)).isNotEmpty(); - assertThat(stateStore.getParallelismOverrides(ctx)).isNotEmpty(); - - stateStore.clearAll(ctx); - - assertThat(stateStore.getCollectedMetrics(ctx)).isEmpty(); - assertThat(stateStore.getScalingHistory(ctx)).isEmpty(); - assertThat(stateStore.getParallelismOverrides(ctx)).isEmpty(); + protected void testDiscardAllState() throws Exception { + super.testDiscardAllState(); // We haven't flushed the clear operation, ConfigMap in Kubernetes should not be empty assertThat(configMapStore.getConfigMapFromKubernetes(ctx).getDataReadOnly()).isNotEmpty();
