This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a9bff78 [FLINK-37086] Clean up should remove all autoscaler state
7a9bff78 is described below

commit 7a9bff780f46acc4b69542c505ecd978705df461
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Fri Jan 10 16:15:49 2025 +0100

    [FLINK-37086] Clean up should remove all autoscaler state
---
 .../jdbc/state/JdbcAutoScalerStateStore.java       |  8 ++----
 .../standalone/StandaloneAutoscalerExecutor.java   | 30 +++++++++++++---------
 .../StandaloneAutoscalerExecutorTest.java          | 14 +++++-----
 .../org/apache/flink/autoscaler/JobAutoScaler.java |  4 +--
 .../apache/flink/autoscaler/JobAutoScalerImpl.java | 15 +++++++----
 .../apache/flink/autoscaler/NoopJobAutoscaler.java |  2 +-
 .../autoscaler/state/AutoScalerStateStore.java     |  3 ---
 .../state/InMemoryAutoScalerStateStore.java        | 18 +++++++++----
 .../flink/autoscaler/JobAutoScalerImplTest.java    | 21 ++++++++++++---
 .../state/KubernetesAutoScalerStateStore.java      |  5 ----
 .../AbstractFlinkResourceReconciler.java           |  2 +-
 11 files changed, 73 insertions(+), 49 deletions(-)

diff --git 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
index dfadc9ac..1ed3ad96 100644
--- 
a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
+++ 
b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java
@@ -247,7 +247,8 @@ public class JdbcAutoScalerStateStore<KEY, Context extends 
JobAutoScalerContext<
 
     @Override
     public void clearAll(Context jobContext) {
-        jdbcStateStore.clearAll(getSerializeKey(jobContext));
+        var serializedKey = getSerializeKey(jobContext);
+        jdbcStateStore.clearAll(serializedKey);
     }
 
     @Override
@@ -255,11 +256,6 @@ public class JdbcAutoScalerStateStore<KEY, Context extends 
JobAutoScalerContext<
         jdbcStateStore.flush(getSerializeKey(jobContext));
     }
 
-    @Override
-    public void removeInfoFromCache(KEY jobKey) {
-        jdbcStateStore.removeInfoFromCache(getSerializeKey(jobKey));
-    }
-
     private String getSerializeKey(Context jobContext) {
         return getSerializeKey(jobContext.getJobKey());
     }
diff --git 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
index 87875ec1..a87de964 100644
--- 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
+++ 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
@@ -40,12 +40,15 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_INTERVAL;
@@ -74,10 +77,10 @@ public class StandaloneAutoscalerExecutor<KEY, Context 
extends JobAutoScalerCont
     private final Set<KEY> scalingJobKeys;
 
     /**
-     * Maintain a set of scaling job keys for the last control loop, it should 
be accessed at {@link
+     * Maintain a map of scaling job keys for the last control loop, it should 
be accessed at {@link
      * #scheduledExecutorService} thread.
      */
-    private Set<KEY> lastScalingKeys;
+    private Map<KEY, Context> lastScaling;
 
     public StandaloneAutoscalerExecutor(
             @Nonnull Configuration conf,
@@ -151,12 +154,12 @@ public class StandaloneAutoscalerExecutor<KEY, Context 
extends JobAutoScalerCont
                                                     throwable);
                                         }
                                         scalingJobKeys.remove(jobKey);
-                                        if (!lastScalingKeys.contains(jobKey)) 
{
+                                        if (!lastScaling.containsKey(jobKey)) {
                                             // Current job has been stopped. 
lastScalingKeys doesn't
                                             // contain jobKey means current 
job key was scaled in a
                                             // previous control loop, and 
current job is stopped in
                                             // the latest control loop.
-                                            autoScaler.cleanup(jobKey);
+                                            autoScaler.cleanup(jobContext);
                                         }
                                     },
                                     scheduledExecutorService));
@@ -165,18 +168,21 @@ public class StandaloneAutoscalerExecutor<KEY, Context 
extends JobAutoScalerCont
     }
 
     private void cleanupStoppedJob(Collection<Context> jobList) {
-        var currentScalingKeys =
-                
jobList.stream().map(JobAutoScalerContext::getJobKey).collect(Collectors.toSet());
-        if (lastScalingKeys != null) {
-            lastScalingKeys.removeAll(currentScalingKeys);
-            for (KEY jobKey : lastScalingKeys) {
+        var jobs =
+                jobList.stream()
+                        .collect(
+                                Collectors.toMap(
+                                        JobAutoScalerContext::getJobKey, 
Function.identity()));
+        if (lastScaling != null) {
+            jobs.keySet().forEach(lastScaling::remove);
+            for (Map.Entry<KEY, Context> job : lastScaling.entrySet()) {
                 // Current job may be scaling, and cleanup should happen after 
scaling.
-                if (!scalingJobKeys.contains(jobKey)) {
-                    autoScaler.cleanup(jobKey);
+                if (!scalingJobKeys.contains(job.getKey())) {
+                    autoScaler.cleanup(job.getValue());
                 }
             }
         }
-        lastScalingKeys = currentScalingKeys;
+        lastScaling = new ConcurrentHashMap<>(jobs);
     }
 
     @VisibleForTesting
diff --git 
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
 
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
index caffdbbb..b17d4ba2 100644
--- 
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
+++ 
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java
@@ -80,7 +80,7 @@ class StandaloneAutoscalerExecutorTest {
                     }
 
                     @Override
-                    public void cleanup(JobID jobKey) {
+                    public void cleanup(JobAutoScalerContext<JobID> context) {
                         fail("Should be called.");
                     }
                 };
@@ -126,7 +126,7 @@ class StandaloneAutoscalerExecutorTest {
                             }
 
                             @Override
-                            public void cleanup(JobID jobID) {
+                            public void cleanup(JobAutoScalerContext<JobID> 
context) {
                                 fail("Should be called.");
                             }
                         })) {
@@ -164,7 +164,7 @@ class StandaloneAutoscalerExecutorTest {
                             }
 
                             @Override
-                            public void cleanup(JobID jobID) {
+                            public void cleanup(JobAutoScalerContext<JobID> 
context) {
                                 fail("Should be called.");
                             }
                         })) {
@@ -221,7 +221,7 @@ class StandaloneAutoscalerExecutorTest {
                             }
 
                             @Override
-                            public void cleanup(JobID jobID) {
+                            public void cleanup(JobAutoScalerContext<JobID> 
context) {
                                 fail("Should be called.");
                             }
                         })) {
@@ -255,7 +255,8 @@ class StandaloneAutoscalerExecutorTest {
                             }
 
                             @Override
-                            public void cleanup(JobID jobID) {
+                            public void cleanup(JobAutoScalerContext<JobID> 
context) {
+                                var jobID = context.getJobKey();
                                 cleanupCounter.put(
                                         jobID, 
cleanupCounter.getOrDefault(jobID, 0) + 1);
                             }
@@ -346,7 +347,8 @@ class StandaloneAutoscalerExecutorTest {
                             }
 
                             @Override
-                            public void cleanup(JobID jobID) {
+                            public void cleanup(JobAutoScalerContext<JobID> 
context) {
+                                var jobID = context.getJobKey();
                                 cleanupCounter.put(
                                         jobID, 
cleanupCounter.getOrDefault(jobID, 0) + 1);
                             }
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java
index f31fac10..9c5c3749 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java
@@ -39,7 +39,7 @@ public interface JobAutoScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
     /**
      * Called when the job is deleted.
      *
-     * @param jobKey Job key.
+     * @param context Job context.
      */
-    void cleanup(KEY jobKey);
+    void cleanup(Context context);
 }
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index f3af3baa..d0e29e67 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -118,12 +118,17 @@ public class JobAutoScalerImpl<KEY, Context extends 
JobAutoScalerContext<KEY>>
     }
 
     @Override
-    public void cleanup(KEY jobKey) {
+    public void cleanup(Context ctx) {
         LOG.info("Cleaning up autoscaling meta data");
-        metricsCollector.cleanup(jobKey);
-        lastEvaluatedMetrics.remove(jobKey);
-        flinkMetrics.remove(jobKey);
-        stateStore.removeInfoFromCache(jobKey);
+        metricsCollector.cleanup(ctx.getJobKey());
+        lastEvaluatedMetrics.remove(ctx.getJobKey());
+        flinkMetrics.remove(ctx.getJobKey());
+        try {
+            stateStore.clearAll(ctx);
+            stateStore.flush(ctx);
+        } catch (Exception e) {
+            LOG.error("Error cleaning up autoscaling meta data for {}", 
ctx.getJobKey(), e);
+        }
     }
 
     @VisibleForTesting
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/NoopJobAutoscaler.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/NoopJobAutoscaler.java
index a7a66f14..91ebfd0c 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/NoopJobAutoscaler.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/NoopJobAutoscaler.java
@@ -24,5 +24,5 @@ public class NoopJobAutoscaler<KEY, Context extends 
JobAutoScalerContext<KEY>>
     public void scale(Context context) throws Exception {}
 
     @Override
-    public void cleanup(KEY jobKey) {}
+    public void cleanup(Context jobKey) {}
 }
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
index 15581b75..64e4dd5b 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
@@ -93,7 +93,4 @@ public interface AutoScalerStateStore<KEY, Context extends 
JobAutoScalerContext<
      * was changed through this interface.
      */
     void flush(Context jobContext) throws Exception;
-
-    /** Clean up all information related to the current job. */
-    void removeInfoFromCache(KEY jobKey);
 }
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
index cf6aa74f..c660f73b 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.autoscaler.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.autoscaler.DelayedScaleDown;
 import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.autoscaler.ScalingSummary;
@@ -34,6 +35,7 @@ import java.util.Optional;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
 
 /**
  * State store based on the Java Heap, the state will be discarded after 
process restarts.
@@ -177,10 +179,16 @@ public class InMemoryAutoScalerStateStore<KEY, Context 
extends JobAutoScalerCont
         // The InMemory state store doesn't persist data.
     }
 
-    @Override
-    public void removeInfoFromCache(KEY jobKey) {
-        scalingHistoryStore.remove(jobKey);
-        collectedMetricsStore.remove(jobKey);
-        parallelismOverridesStore.remove(jobKey);
+    @VisibleForTesting
+    public boolean hasDataFor(Context jobContext) {
+        var k = jobContext.getJobKey();
+        return Stream.of(
+                        scalingHistoryStore,
+                        parallelismOverridesStore,
+                        collectedMetricsStore,
+                        tmConfigOverrides,
+                        scalingTrackingStore,
+                        delayedScaleDownStore)
+                .anyMatch(m -> m.containsKey(k));
     }
 }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
index c9d74e65..552971d7 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.autoscaler.metrics.TestMetrics;
 import org.apache.flink.autoscaler.realizer.ScalingRealizer;
 import org.apache.flink.autoscaler.realizer.TestingScalingRealizer;
-import org.apache.flink.autoscaler.state.AutoScalerStateStore;
 import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
 import org.apache.flink.autoscaler.topology.JobTopology;
 import org.apache.flink.autoscaler.topology.VertexInfo;
@@ -72,7 +71,7 @@ public class JobAutoScalerImplTest {
     private JobAutoScalerContext<JobID> context;
     private TestingScalingRealizer<JobID, JobAutoScalerContext<JobID>> 
scalingRealizer;
     private TestingEventCollector<JobID, JobAutoScalerContext<JobID>> 
eventCollector;
-    private AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> 
stateStore;
+    private InMemoryAutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> 
stateStore;
 
     @BeforeEach
     public void setup() {
@@ -232,7 +231,12 @@ public class JobAutoScalerImplTest {
     void testParallelismOverrides() throws Exception {
         var autoscaler =
                 new JobAutoScalerImpl<>(
-                        null, null, null, eventCollector, scalingRealizer, 
stateStore);
+                        new TestingMetricsCollector<>(new JobTopology()),
+                        null,
+                        null,
+                        eventCollector,
+                        scalingRealizer,
+                        stateStore);
 
         // Initially we should return empty overrides, do not crate any state
         assertThat(autoscaler.getParallelismOverrides(context)).isEmpty();
@@ -282,6 +286,17 @@ public class JobAutoScalerImplTest {
         context.getConfiguration().setString(AUTOSCALER_ENABLED.key(), "asd");
         autoscaler.scale(context);
         assertParallelismOverrides(Map.of(v1, "1", v2, "2"));
+
+        context.getConfiguration().setString(AUTOSCALER_ENABLED.key(), "true");
+        autoscaler.scale(context);
+        assertParallelismOverrides(Map.of(v1, "1", v2, "2"));
+
+        // Make sure cleanup removes everything
+        assertTrue(stateStore.hasDataFor(context));
+        autoscaler.cleanup(context);
+        assertFalse(stateStore.hasDataFor(context));
+        autoscaler.scale(context);
+        assertParallelismOverrides(null);
     }
 
     @Test
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
index 1059b96a..da445973 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java
@@ -266,11 +266,6 @@ public class KubernetesAutoScalerStateStore
         configMapStore.flush(jobContext);
     }
 
-    @Override
-    public void removeInfoFromCache(ResourceID resourceID) {
-        configMapStore.removeInfoFromCache(resourceID);
-    }
-
     @SneakyThrows
     protected static String serializeScalingHistory(
             Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
scalingHistory) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 757e1a7c..3b2048c9 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -276,7 +276,7 @@ public abstract class AbstractFlinkResourceReconciler<
 
     @Override
     public DeleteControl cleanup(FlinkResourceContext<CR> ctx) {
-        autoscaler.cleanup(ResourceID.fromResource(ctx.getResource()));
+        autoscaler.cleanup(ctx.getJobAutoScalerContext());
         return cleanupInternal(ctx);
     }
 

Reply via email to