This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 7a9bff78 [FLINK-37086] Clean up should remove all autoscaler state 7a9bff78 is described below commit 7a9bff780f46acc4b69542c505ecd978705df461 Author: Gyula Fora <g_f...@apple.com> AuthorDate: Fri Jan 10 16:15:49 2025 +0100 [FLINK-37086] Clean up should remove all autoscaler state --- .../jdbc/state/JdbcAutoScalerStateStore.java | 8 ++---- .../standalone/StandaloneAutoscalerExecutor.java | 30 +++++++++++++--------- .../StandaloneAutoscalerExecutorTest.java | 14 +++++----- .../org/apache/flink/autoscaler/JobAutoScaler.java | 4 +-- .../apache/flink/autoscaler/JobAutoScalerImpl.java | 15 +++++++---- .../apache/flink/autoscaler/NoopJobAutoscaler.java | 2 +- .../autoscaler/state/AutoScalerStateStore.java | 3 --- .../state/InMemoryAutoScalerStateStore.java | 18 +++++++++---- .../flink/autoscaler/JobAutoScalerImplTest.java | 21 ++++++++++++--- .../state/KubernetesAutoScalerStateStore.java | 5 ---- .../AbstractFlinkResourceReconciler.java | 2 +- 11 files changed, 73 insertions(+), 49 deletions(-) diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java index dfadc9ac..1ed3ad96 100644 --- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java +++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java @@ -247,7 +247,8 @@ public class JdbcAutoScalerStateStore<KEY, Context extends JobAutoScalerContext< @Override public void clearAll(Context jobContext) { - jdbcStateStore.clearAll(getSerializeKey(jobContext)); + var serializedKey = getSerializeKey(jobContext); + jdbcStateStore.clearAll(serializedKey); } @Override @@ -255,11 +256,6 @@ public class JdbcAutoScalerStateStore<KEY, Context extends JobAutoScalerContext< jdbcStateStore.flush(getSerializeKey(jobContext)); } - @Override - public void removeInfoFromCache(KEY jobKey) { - jdbcStateStore.removeInfoFromCache(getSerializeKey(jobKey)); - } - private String getSerializeKey(Context jobContext) { return getSerializeKey(jobContext.getJobKey()); } diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java index 87875ec1..a87de964 100644 --- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java @@ -40,12 +40,15 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_INTERVAL; @@ -74,10 +77,10 @@ public class StandaloneAutoscalerExecutor<KEY, Context extends JobAutoScalerCont private final Set<KEY> scalingJobKeys; /** - * Maintain a set of scaling job keys for the last control loop, it should be accessed at {@link + * Maintain a map of scaling job keys for the last control loop, it should be accessed at {@link * #scheduledExecutorService} thread. */ - private Set<KEY> lastScalingKeys; + private Map<KEY, Context> lastScaling; public StandaloneAutoscalerExecutor( @Nonnull Configuration conf, @@ -151,12 +154,12 @@ public class StandaloneAutoscalerExecutor<KEY, Context extends JobAutoScalerCont throwable); } scalingJobKeys.remove(jobKey); - if (!lastScalingKeys.contains(jobKey)) { + if (!lastScaling.containsKey(jobKey)) { // Current job has been stopped. lastScalingKeys doesn't // contain jobKey means current job key was scaled in a // previous control loop, and current job is stopped in // the latest control loop. - autoScaler.cleanup(jobKey); + autoScaler.cleanup(jobContext); } }, scheduledExecutorService)); @@ -165,18 +168,21 @@ public class StandaloneAutoscalerExecutor<KEY, Context extends JobAutoScalerCont } private void cleanupStoppedJob(Collection<Context> jobList) { - var currentScalingKeys = - jobList.stream().map(JobAutoScalerContext::getJobKey).collect(Collectors.toSet()); - if (lastScalingKeys != null) { - lastScalingKeys.removeAll(currentScalingKeys); - for (KEY jobKey : lastScalingKeys) { + var jobs = + jobList.stream() + .collect( + Collectors.toMap( + JobAutoScalerContext::getJobKey, Function.identity())); + if (lastScaling != null) { + jobs.keySet().forEach(lastScaling::remove); + for (Map.Entry<KEY, Context> job : lastScaling.entrySet()) { // Current job may be scaling, and cleanup should happen after scaling. - if (!scalingJobKeys.contains(jobKey)) { - autoScaler.cleanup(jobKey); + if (!scalingJobKeys.contains(job.getKey())) { + autoScaler.cleanup(job.getValue()); } } } - lastScalingKeys = currentScalingKeys; + lastScaling = new ConcurrentHashMap<>(jobs); } @VisibleForTesting diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java index caffdbbb..b17d4ba2 100644 --- a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java +++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java @@ -80,7 +80,7 @@ class StandaloneAutoscalerExecutorTest { } @Override - public void cleanup(JobID jobKey) { + public void cleanup(JobAutoScalerContext<JobID> context) { fail("Should be called."); } }; @@ -126,7 +126,7 @@ class StandaloneAutoscalerExecutorTest { } @Override - public void cleanup(JobID jobID) { + public void cleanup(JobAutoScalerContext<JobID> context) { fail("Should be called."); } })) { @@ -164,7 +164,7 @@ class StandaloneAutoscalerExecutorTest { } @Override - public void cleanup(JobID jobID) { + public void cleanup(JobAutoScalerContext<JobID> context) { fail("Should be called."); } })) { @@ -221,7 +221,7 @@ class StandaloneAutoscalerExecutorTest { } @Override - public void cleanup(JobID jobID) { + public void cleanup(JobAutoScalerContext<JobID> context) { fail("Should be called."); } })) { @@ -255,7 +255,8 @@ class StandaloneAutoscalerExecutorTest { } @Override - public void cleanup(JobID jobID) { + public void cleanup(JobAutoScalerContext<JobID> context) { + var jobID = context.getJobKey(); cleanupCounter.put( jobID, cleanupCounter.getOrDefault(jobID, 0) + 1); } @@ -346,7 +347,8 @@ class StandaloneAutoscalerExecutorTest { } @Override - public void cleanup(JobID jobID) { + public void cleanup(JobAutoScalerContext<JobID> context) { + var jobID = context.getJobKey(); cleanupCounter.put( jobID, cleanupCounter.getOrDefault(jobID, 0) + 1); } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java index f31fac10..9c5c3749 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java @@ -39,7 +39,7 @@ public interface JobAutoScaler<KEY, Context extends JobAutoScalerContext<KEY>> { /** * Called when the job is deleted. * - * @param jobKey Job key. + * @param context Job context. */ - void cleanup(KEY jobKey); + void cleanup(Context context); } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java index f3af3baa..d0e29e67 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java @@ -118,12 +118,17 @@ public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>> } @Override - public void cleanup(KEY jobKey) { + public void cleanup(Context ctx) { LOG.info("Cleaning up autoscaling meta data"); - metricsCollector.cleanup(jobKey); - lastEvaluatedMetrics.remove(jobKey); - flinkMetrics.remove(jobKey); - stateStore.removeInfoFromCache(jobKey); + metricsCollector.cleanup(ctx.getJobKey()); + lastEvaluatedMetrics.remove(ctx.getJobKey()); + flinkMetrics.remove(ctx.getJobKey()); + try { + stateStore.clearAll(ctx); + stateStore.flush(ctx); + } catch (Exception e) { + LOG.error("Error cleaning up autoscaling meta data for {}", ctx.getJobKey(), e); + } } @VisibleForTesting diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/NoopJobAutoscaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/NoopJobAutoscaler.java index a7a66f14..91ebfd0c 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/NoopJobAutoscaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/NoopJobAutoscaler.java @@ -24,5 +24,5 @@ public class NoopJobAutoscaler<KEY, Context extends JobAutoScalerContext<KEY>> public void scale(Context context) throws Exception {} @Override - public void cleanup(KEY jobKey) {} + public void cleanup(Context jobKey) {} } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java index 15581b75..64e4dd5b 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java @@ -93,7 +93,4 @@ public interface AutoScalerStateStore<KEY, Context extends JobAutoScalerContext< * was changed through this interface. */ void flush(Context jobContext) throws Exception; - - /** Clean up all information related to the current job. */ - void removeInfoFromCache(KEY jobKey); } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java index cf6aa74f..c660f73b 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java @@ -17,6 +17,7 @@ package org.apache.flink.autoscaler.state; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.autoscaler.DelayedScaleDown; import org.apache.flink.autoscaler.JobAutoScalerContext; import org.apache.flink.autoscaler.ScalingSummary; @@ -34,6 +35,7 @@ import java.util.Optional; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Stream; /** * State store based on the Java Heap, the state will be discarded after process restarts. @@ -177,10 +179,16 @@ public class InMemoryAutoScalerStateStore<KEY, Context extends JobAutoScalerCont // The InMemory state store doesn't persist data. } - @Override - public void removeInfoFromCache(KEY jobKey) { - scalingHistoryStore.remove(jobKey); - collectedMetricsStore.remove(jobKey); - parallelismOverridesStore.remove(jobKey); + @VisibleForTesting + public boolean hasDataFor(Context jobContext) { + var k = jobContext.getJobKey(); + return Stream.of( + scalingHistoryStore, + parallelismOverridesStore, + collectedMetricsStore, + tmConfigOverrides, + scalingTrackingStore, + delayedScaleDownStore) + .anyMatch(m -> m.containsKey(k)); } } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java index c9d74e65..552971d7 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java @@ -27,7 +27,6 @@ import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.autoscaler.metrics.TestMetrics; import org.apache.flink.autoscaler.realizer.ScalingRealizer; import org.apache.flink.autoscaler.realizer.TestingScalingRealizer; -import org.apache.flink.autoscaler.state.AutoScalerStateStore; import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore; import org.apache.flink.autoscaler.topology.JobTopology; import org.apache.flink.autoscaler.topology.VertexInfo; @@ -72,7 +71,7 @@ public class JobAutoScalerImplTest { private JobAutoScalerContext<JobID> context; private TestingScalingRealizer<JobID, JobAutoScalerContext<JobID>> scalingRealizer; private TestingEventCollector<JobID, JobAutoScalerContext<JobID>> eventCollector; - private AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> stateStore; + private InMemoryAutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> stateStore; @BeforeEach public void setup() { @@ -232,7 +231,12 @@ public class JobAutoScalerImplTest { void testParallelismOverrides() throws Exception { var autoscaler = new JobAutoScalerImpl<>( - null, null, null, eventCollector, scalingRealizer, stateStore); + new TestingMetricsCollector<>(new JobTopology()), + null, + null, + eventCollector, + scalingRealizer, + stateStore); // Initially we should return empty overrides, do not crate any state assertThat(autoscaler.getParallelismOverrides(context)).isEmpty(); @@ -282,6 +286,17 @@ public class JobAutoScalerImplTest { context.getConfiguration().setString(AUTOSCALER_ENABLED.key(), "asd"); autoscaler.scale(context); assertParallelismOverrides(Map.of(v1, "1", v2, "2")); + + context.getConfiguration().setString(AUTOSCALER_ENABLED.key(), "true"); + autoscaler.scale(context); + assertParallelismOverrides(Map.of(v1, "1", v2, "2")); + + // Make sure cleanup removes everything + assertTrue(stateStore.hasDataFor(context)); + autoscaler.cleanup(context); + assertFalse(stateStore.hasDataFor(context)); + autoscaler.scale(context); + assertParallelismOverrides(null); } @Test diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java index 1059b96a..da445973 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java @@ -266,11 +266,6 @@ public class KubernetesAutoScalerStateStore configMapStore.flush(jobContext); } - @Override - public void removeInfoFromCache(ResourceID resourceID) { - configMapStore.removeInfoFromCache(resourceID); - } - @SneakyThrows protected static String serializeScalingHistory( Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java index 757e1a7c..3b2048c9 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java @@ -276,7 +276,7 @@ public abstract class AbstractFlinkResourceReconciler< @Override public DeleteControl cleanup(FlinkResourceContext<CR> ctx) { - autoscaler.cleanup(ResourceID.fromResource(ctx.getResource())); + autoscaler.cleanup(ctx.getJobAutoScalerContext()); return cleanupInternal(ctx); }