mxm commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1350028811


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -166,6 +197,13 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
                     .stringType()
                     .asList()
                     .defaultValues()
+                    
.withDeprecatedKeys(deprecatedOperatorConfigKey("vertex.exclude.ids"))
                     .withDescription(
                             "A (semicolon-separated) list of vertex ids in 
hexstring for which to disable scaling. Caution: For non-sink vertices this 
will still scale their downstream operators until 
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
+
+    public static final ConfigOption<Duration> FLINK_CLIENT_TIMEOUT =
+            autoScalerConfig("flink.client.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(10))
+                    .withDescription("The timeout for waiting the flink rest 
client to return.");

Review Comment:
   Do we need to expose this as a configuration or can we just use the default? 
   
   I think it is better to let the user configure standard Flink configs as 
listed here: 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#client-timeout



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -15,149 +15,180 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.config;
+package org.apache.flink.autoscaler.config;
 
+import org.apache.flink.autoscaler.metrics.MetricAggregator;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.MetricAggregator;
 
 import java.time.Duration;
 import java.util.List;
 
-import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig;
-
 /** Config options related to the autoscaler module. */
 public class AutoScalerOptions {
 
+    public static final String DEPRECATED_K8S_OP_CONF_PREFIX = 
"kubernetes.operator.";
+    public static final String AUTOSCALER_CONF_PREFIX = "job.autoscaler.";
+
+    private static String deprecatedOperatorConfigKey(String key) {
+        return DEPRECATED_K8S_OP_CONF_PREFIX + AUTOSCALER_CONF_PREFIX + key;
+    }
+
+    private static String autoScalerConfigKey(String key) {
+        return AUTOSCALER_CONF_PREFIX + key;
+    }
+
     private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
-        return operatorConfig("job.autoscaler." + key);
+        return ConfigOptions.key(autoScalerConfigKey(key));
     }
 
     public static final ConfigOption<Boolean> AUTOSCALER_ENABLED =
             autoScalerConfig("enabled")
                     .booleanType()
                     .defaultValue(false)
+                    .withDeprecatedKeys(deprecatedOperatorConfigKey("enabled"))

Review Comment:
   This might confuse some existing users because the deprecated keys will not 
appear on the configuration page. Can we add a note on the configuration page 
that we renamed the configuration prefix?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -38,42 +36,42 @@
 import java.util.Map;
 import java.util.SortedMap;
 
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 
 /** Component responsible for computing vertex parallelism based on the 
scaling metrics. */
-public class JobVertexScaler {
+public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(JobVertexScaler.class);
 
+    @VisibleForTesting public static final String INEFFECTIVE_SCALING = 
"IneffectiveScaling";

Review Comment:
   Can we inline this? It isn't used in tests either.



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java:
##########
@@ -41,159 +37,104 @@
 import java.util.Map;
 import java.util.TreeMap;
 
-import static 
org.apache.flink.kubernetes.operator.autoscaler.JobVertexScaler.INNEFFECTIVE_MESSAGE_FORMAT;
+import static org.apache.flink.autoscaler.JobVertexScaler.INEFFECTIVE_SCALING;
+import static 
org.apache.flink.autoscaler.JobVertexScaler.INNEFFECTIVE_MESSAGE_FORMAT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Test for vertex parallelism scaler logic. */
-@EnableKubernetesMockClient(crud = true)
 public class JobVertexScalerTest {
 
-    private JobVertexScaler vertexScaler;
-    private Configuration conf;
-
-    private KubernetesClient kubernetesClient;
-    private EventCollector eventCollector;
-
-    private FlinkDeployment flinkDep;
+    private EventCollector<JobID, JobAutoScalerContext<JobID>> eventCollector;
+    private JobVertexScaler<JobID, JobAutoScalerContext<JobID>> vertexScaler;
+    private JobAutoScalerContext<JobID> context;
 
     @BeforeEach
     public void setup() {
-        flinkDep = TestUtils.buildApplicationCluster();
-        kubernetesClient.resource(flinkDep).createOrReplace();
-        eventCollector = new EventCollector();
-        vertexScaler = new JobVertexScaler(new EventRecorder(eventCollector));
-        conf = new Configuration();
+        eventCollector = new EventCollector<>();
+        vertexScaler = new JobVertexScaler<>(eventCollector);
+        var conf = new Configuration();
         conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
         conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) 
Integer.MAX_VALUE);
         conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+        JobID jobID = new JobID();
+        context =
+                new JobAutoScalerContext<>(
+                        jobID, jobID, conf, new UnregisteredMetricsGroup(), 
null);
     }
 
     @Test
     public void testParallelismScaling() {
         var op = new JobVertexID();
-        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        context.getConfiguration().set(AutoScalerOptions.TARGET_UTILIZATION, 
1.);

Review Comment:
   Can we re-add `conf` above and initialize it with 
`context.getconfiguration()`?



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/EventCollector.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The event handler with collect events.
+ *
+ * @param <KEY>
+ * @param <Context>
+ */
+public class EventCollector<KEY, Context extends JobAutoScalerContext<KEY>>
+        implements AutoScalerEventHandler<KEY, Context> {
+
+    public final LinkedList<Event<KEY, Context>> events = new LinkedList<>();
+
+    public final Map<String, Event<KEY, Context>> eventMap = new 
ConcurrentHashMap<>();
+
+    @Override
+    public void handleEvent(
+            Context context,
+            Type type,
+            String reason,
+            String message,
+            @Nullable String messageKey) {
+        String eventKey =
+                generateEventKey(context, type, reason, messageKey != null ? 
messageKey : message);
+        Event<KEY, Context> event = eventMap.get(eventKey);
+        if (event == null) {
+            Event<KEY, Context> newEvent = new Event<>(context, type, reason, 
message, messageKey);
+            events.add(newEvent);
+            eventMap.put(eventKey, newEvent);
+        } else {
+            event.incrementCount();
+            events.add(event);
+        }
+    }
+
+    private String generateEventKey(Context context, Type type, String reason, 
String message) {
+        return context.getJobID() + type.name() + reason + message;
+    }
+
+    /** The collected event. */
+    public static class Event<KEY, Context extends JobAutoScalerContext<KEY>> {
+
+        private final Context context;
+
+        private final Type type;
+
+        private final String reason;
+
+        private final String message;
+
+        @Nullable private final String messageKey;
+
+        private int count;
+
+        public Event(
+                Context context,
+                Type type,
+                String reason,
+                String message,
+                @Nullable String messageKey) {
+            this.context = context;
+            this.type = type;
+            this.reason = reason;
+            this.message = message;
+            this.messageKey = messageKey;
+            this.count = 1;
+        }
+
+        public Context getContext() {
+            return context;
+        }
+
+        public Type getType() {
+            return type;
+        }
+
+        public String getReason() {
+            return reason;
+        }
+
+        public String getMessage() {
+            return message;
+        }
+
+        @Nullable
+        public String getMessageKey() {
+            return messageKey;
+        }
+
+        private void incrementCount() {
+            count++;
+        }
+
+        public int getCount() {
+            return count;
+        }

Review Comment:
   Those all can be replaced by `@Getter` on the corresponding fields (except 
incrementCount).



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.kubernetes.utils.Constants;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The config map store, it's responsible for store/get/remove the state with 
String type. */

Review Comment:
   ```suggestion
   /** The ConfigMapStore persists state in Kubernetes ConfigMaps */
   ```



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -174,6 +182,32 @@ public void reconcile(FlinkResourceContext<CR> ctx) throws 
Exception {
         }
     }
 
+    private void scaling(FlinkResourceContext<CR> ctx) throws Exception {
+        KubernetesJobAutoScalerContext autoScalerContext = 
ctx.getJobAutoScalerContext();
+
+        if (autoscalerDisabled(ctx)) {
+            autoScalerContext.getConfiguration().set(AUTOSCALER_ENABLED, 
false);
+            resourceScaler.scale(autoScalerContext);
+            return;
+        }
+        if (waitingForRunning(ctx.getResource().getStatus())) {
+            LOG.info("Autoscaler is waiting for stable, running state");
+            resourceScaler.cleanup(autoScalerContext.getJobKey());
+            return;
+        }
+        resourceScaler.scale(autoScalerContext);

Review Comment:
   ```suggestion
           if (autoscalerDisabled(ctx)) {
               autoScalerContext.getConfiguration().set(AUTOSCALER_ENABLED, 
false);
               resourceScaler.scale(autoScalerContext);
           } else if (waitingForRunning(ctx.getResource().getStatus())) {
               LOG.info("Autoscaler is waiting for stable, running state");
               resourceScaler.cleanup(autoScalerContext.getJobKey());
           } else {
               resourceScaler.scale(autoScalerContext);
           }
   ```
   
   Much easier to read for me.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.function.SupplierWithException;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+
+/** The kubernetes job autoscaler context. */

Review Comment:
   ```suggestion
   /** An implementation of JobAutoscalerContext for Kubernetes. */
   ```



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Optional;
+import java.util.SortedMap;
+
+/**
+ * The state store is responsible for store all states during scaling.

Review Comment:
   ```suggestion
    * The state store is responsible for storing all state during scaling.
   ```



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java:
##########
@@ -15,22 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.reconciler.deployment;
+package org.apache.flink.autoscaler.event;
 
-import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
-import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
 
-/** An autoscaler implementation which does nothing. */
-public class NoopJobAutoscalerFactory implements JobAutoScalerFactory, 
JobAutoScaler {
+import javax.annotation.Nullable;
 
-    @Override
-    public JobAutoScaler create(EventRecorder eventRecorder) {
-        return this;
-    }
+/**
+ * Handler all loggable events during scaling.
+ *
+ * @param <KEY> The job key.
+ * @param <Context> Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerEventHandler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
 
-    @Override
-    public void scale(FlinkResourceContext<?> ctx) {}
+    void handleEvent(
+            Context context, Type type, String reason, String message, 
@Nullable String messageKey);
 
-    @Override
-    public void cleanup(FlinkResourceContext<?> ctx) {}
+    /** The type of the events. */
+    enum Type {
+        Normal,
+        Warning

Review Comment:
   What about `Error`?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##########
@@ -154,7 +155,7 @@ private Map<ScalingMetric, EvaluatedScalingMetric> 
evaluateMetrics(
     }
 
     @VisibleForTesting
-    protected static void computeProcessingRateThresholds(
+    public static void computeProcessingRateThresholds(

Review Comment:
   Any reason for changing the visibility here? I couldn't find why this was 
required.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism;
+import static 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism;
+
+/** The default implementation of {@link JobAutoScaler}. */
+public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>>
+        implements JobAutoScaler<KEY, Context> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobAutoScalerImpl.class);
+
+    @VisibleForTesting protected static final String AUTOSCALER_ERROR = 
"AutoscalerError";

Review Comment:
   Can we inline this constant? This isn't used in tests either.



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java:
##########
@@ -217,73 +198,38 @@ public void endToEnd() throws Exception {
 
         now = now.plus(Duration.ofSeconds(10));
         setClocksTo(now);
-        restart(now);
-
-        // after restart while the job is not running the evaluated metrics 
are gone
-        autoscaler.scale(getResourceContext(app, ctx));
-        assertEquals(3, getOrCreateInfo(app, 
kubernetesClient).getMetricHistory().size());
-        
assertNull(autoscaler.lastEvaluatedMetrics.get(ResourceID.fromResource(app)));
-        scaledParallelism = 
ScalingExecutorTest.getScaledParallelism(kubernetesClient, app);
-        assertEquals(4, scaledParallelism.get(source));
-        assertEquals(4, scaledParallelism.get(sink));
-

Review Comment:
   Why did we remove this check?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Optional;
+import java.util.SortedMap;
+
+/**
+ * The state store is responsible for store all states during scaling.
+ *
+ * @param <KEY> The job key.
+ * @param <Context> Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerStateStore<KEY, Context extends 
JobAutoScalerContext<KEY>> {
+
+    void storeScalingHistory(
+            Context jobContext, Map<JobVertexID, SortedMap<Instant, 
ScalingSummary>> scalingHistory)
+            throws Exception;
+
+    Optional<Map<JobVertexID, SortedMap<Instant, ScalingSummary>>> 
getScalingHistory(
+            Context jobContext) throws Exception;
+
+    void removeScalingHistory(Context jobContext) throws Exception;
+
+    void storeEvaluatedMetrics(
+            Context jobContext, SortedMap<Instant, CollectedMetrics> 
evaluatedMetrics)
+            throws Exception;
+
+    Optional<SortedMap<Instant, CollectedMetrics>> getEvaluatedMetrics(Context 
jobContext)
+            throws Exception;
+
+    void removeEvaluatedMetrics(Context jobContext) throws Exception;
+
+    void storeParallelismOverrides(Context jobContext, Map<String, String> 
parallelismOverrides)
+            throws Exception;
+
+    Optional<Map<String, String>> getParallelismOverrides(Context jobContext) 
throws Exception;
+
+    void removeParallelismOverrides(Context jobContext) throws Exception;
+
+    /**
+     * The flush is needed because we just save data in cache for all store 
methods, and flush these
+     * data to the physical storage after the flush method is called to 
improve the performance.

Review Comment:
   ```suggestion
        * Flushing is needed because we just save data in cache for all store 
methods. 
        * For less write operations, we flush the cached data to the physical 
storage 
        * only after all operations have been performed.
   ```



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingHistoryUtils.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.metrics;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import javax.annotation.Nonnull;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/** The utils for scaling history. */
+public class ScalingHistoryUtils {
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void 
addToScalingHistoryAndStore(
+            AutoScalerStateStore<KEY, Context> stateStore,
+            Context context,
+            Instant now,
+            Map<JobVertexID, ScalingSummary> summaries)
+            throws Exception {
+        addToScalingHistoryAndStore(
+                stateStore,
+                context,
+                getTrimmedScalingHistory(stateStore, context, now),
+                now,
+                summaries);
+    }
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void 
addToScalingHistoryAndStore(
+            AutoScalerStateStore<KEY, Context> stateStore,
+            Context context,
+            Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
scalingHistory,
+            Instant now,
+            Map<JobVertexID, ScalingSummary> summaries)
+            throws Exception {
+
+        summaries.forEach(
+                (id, summary) ->
+                        scalingHistory.computeIfAbsent(id, j -> new 
TreeMap<>()).put(now, summary));
+        stateStore.storeScalingHistory(context, scalingHistory);
+    }
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void 
updateVertexList(
+            AutoScalerStateStore<KEY, Context> stateStore,
+            Context ctx,
+            Instant now,
+            Set<JobVertexID> vertexSet)
+            throws Exception {
+        Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
trimmedScalingHistory =
+                getTrimmedScalingHistory(stateStore, ctx, now);
+
+        if (trimmedScalingHistory.keySet().removeIf(v -> 
!vertexSet.contains(v))) {
+            stateStore.storeScalingHistory(ctx, trimmedScalingHistory);
+        }
+    }
+
+    @Nonnull
+    public static <KEY, Context extends JobAutoScalerContext<KEY>>
+            Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
getTrimmedScalingHistory(
+                    AutoScalerStateStore<KEY, Context> autoScalerStateStore,
+                    Context context,
+                    Instant now)
+                    throws Exception {
+        var conf = context.getConfiguration();
+        return autoScalerStateStore
+                .getScalingHistory(context)
+                .map(
+                        scalingHistory -> {
+                            var entryIt = scalingHistory.entrySet().iterator();
+                            while (entryIt.hasNext()) {
+                                var entry = entryIt.next();
+                                // Limit how long past scaling decisions are 
remembered
+                                entry.setValue(
+                                        entry.getValue()
+                                                .tailMap(
+                                                        now.minus(
+                                                                conf.get(
+                                                                        
AutoScalerOptions
+                                                                               
 .VERTEX_SCALING_HISTORY_AGE))));

Review Comment:
   I think this has caused issue in the pasts. It is better to either use a new 
TreeMap (versus a subset) or clear old entries like this:
   
   ```java
                                                   .headMap(
                                                           now.minus(
                                                                   conf.get(
                                                                           
AutoScalerOptions
                                                                                
   .VERTEX_SCALING_HISTORY_AGE)))
                                                           .clear();
   ```



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java:
##########
@@ -217,73 +198,38 @@ public void endToEnd() throws Exception {
 
         now = now.plus(Duration.ofSeconds(10));
         setClocksTo(now);
-        restart(now);
-
-        // after restart while the job is not running the evaluated metrics 
are gone
-        autoscaler.scale(getResourceContext(app, ctx));
-        assertEquals(3, getOrCreateInfo(app, 
kubernetesClient).getMetricHistory().size());
-        
assertNull(autoscaler.lastEvaluatedMetrics.get(ResourceID.fromResource(app)));
-        scaledParallelism = 
ScalingExecutorTest.getScaledParallelism(kubernetesClient, app);
-        assertEquals(4, scaledParallelism.get(source));
-        assertEquals(4, scaledParallelism.get(sink));
-
-        now = now.plus(Duration.ofSeconds(1));
-        setClocksTo(now);
-        running(now);
 
-        // once the job is running we got back the evaluated metric except the 
recommended
-        // parallelisms (until the metric window is full again)
-        autoscaler.scale(getResourceContext(app, ctx));
-        assertEquals(1, getOrCreateInfo(app, 
kubernetesClient).getMetricHistory().size());
+        autoscaler.scale(context);
+        assertEvaluatedMetricsSize(1);
         assertEquals(4., getCurrentMetricValue(source, PARALLELISM));
         assertEquals(4., getCurrentMetricValue(sink, PARALLELISM));
-        assertNull(getCurrentMetricValue(source, RECOMMENDED_PARALLELISM));
-        assertNull(getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM));
-        scaledParallelism = 
ScalingExecutorTest.getScaledParallelism(kubernetesClient, app);
+        assertEquals(4., getCurrentMetricValue(source, 
RECOMMENDED_PARALLELISM));
+        assertEquals(4., getCurrentMetricValue(sink, RECOMMENDED_PARALLELISM));
+        scaledParallelism = 
ScalingExecutorTest.getScaledParallelism(stateStore, context);
         assertEquals(4, scaledParallelism.get(source));
         assertEquals(4, scaledParallelism.get(sink));
     }
 
+    private void assertEvaluatedMetricsSize(int expectedSize) {
+        Optional<SortedMap<Instant, CollectedMetrics>> evaluatedMetricsOpt =
+                stateStore.getEvaluatedMetrics(context);
+        assertThat(evaluatedMetricsOpt).isPresent();
+        assertThat(evaluatedMetricsOpt.get()).hasSize(expectedSize);
+    }
+
     private Double getCurrentMetricValue(JobVertexID jobVertexID, 
ScalingMetric scalingMetric) {
         var metric =
                 autoscaler
                         .lastEvaluatedMetrics
-                        .get(ResourceID.fromResource(app))
+                        .get(context.getJobKey())
                         .get(jobVertexID)
                         .get(scalingMetric);
         return metric == null ? null : metric.getCurrent();
     }
 
-    private void restart(Instant now) {
-        metricsCollector.setJobUpdateTs(now);
-        app.getStatus().getJobStatus().setState(JobStatus.CREATED.name());
-    }
-
-    private void running(Instant now) {
-        metricsCollector.setJobUpdateTs(now);
-        app.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
-    }
-

Review Comment:
   Why did we remove the Job status changes and only set the job update time 
above?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/AutoscalerLoader.java:
##########
@@ -17,41 +17,37 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
-import 
org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory;
-import 
org.apache.flink.kubernetes.operator.reconciler.deployment.NoopJobAutoscalerFactory;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ServiceLoader;
-
-/** Loads the active Autoscaler implementation from the classpath. */
+import org.apache.flink.autoscaler.JobAutoScaler;
+import org.apache.flink.autoscaler.JobAutoScalerImpl;
+import org.apache.flink.autoscaler.RestApiMetricsCollector;
+import org.apache.flink.autoscaler.ScalingExecutor;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.kubernetes.operator.autoscaler.ConfigMapStore;
+import 
org.apache.flink.kubernetes.operator.autoscaler.KubernetesAutoScalerEventHandler;
+import 
org.apache.flink.kubernetes.operator.autoscaler.KubernetesAutoScalerStateStore;
+import 
org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
+import 
org.apache.flink.kubernetes.operator.autoscaler.KubernetesScalingRealizer;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+
+/** Loads the Autoscaler implementation. */
 public class AutoscalerLoader {

Review Comment:
   Given that there isn't anything dynamically loaded anymore, this class 
should be removed. Or renamed to `AutoscalerFactory`.



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/realizer/ScalingRealizerCollector.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.realizer;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * The event handler with collect events.
+ *
+ * @param <KEY>
+ * @param <Context>
+ */

Review Comment:
   ```suggestion
   /** The event handler for collecting scaling events. */
   ```



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism;
+import static 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism;
+
+/** The default implementation of {@link JobAutoScaler}. */
+public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>>
+        implements JobAutoScaler<KEY, Context> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobAutoScalerImpl.class);
+
+    @VisibleForTesting protected static final String AUTOSCALER_ERROR = 
"AutoscalerError";
+
+    private final ScalingMetricCollector<KEY, Context> metricsCollector;
+    private final ScalingMetricEvaluator evaluator;
+    private final ScalingExecutor<KEY, Context> scalingExecutor;
+    private final AutoScalerEventHandler<KEY, Context> eventHandler;
+    private final ScalingRealizer<KEY, Context> scalingRealizer;
+    private final AutoScalerStateStore<KEY, Context> stateStore;
+
+    @VisibleForTesting
+    final Map<KEY, Map<JobVertexID, Map<ScalingMetric, 
EvaluatedScalingMetric>>>
+            lastEvaluatedMetrics = new ConcurrentHashMap<>();
+
+    @VisibleForTesting
+    final Map<KEY, AutoscalerFlinkMetrics> flinkMetrics = new 
ConcurrentHashMap<>();
+
+    public JobAutoScalerImpl(
+            ScalingMetricCollector<KEY, Context> metricsCollector,
+            ScalingMetricEvaluator evaluator,
+            ScalingExecutor<KEY, Context> scalingExecutor,
+            AutoScalerEventHandler<KEY, Context> eventHandler,
+            ScalingRealizer<KEY, Context> scalingRealizer,
+            AutoScalerStateStore<KEY, Context> stateStore) {
+        this.metricsCollector = metricsCollector;
+        this.evaluator = evaluator;
+        this.scalingExecutor = scalingExecutor;
+        this.eventHandler = eventHandler;
+        this.scalingRealizer = scalingRealizer;
+        this.stateStore = stateStore;
+    }
+
+    @Override
+    public void scale(Context ctx) throws Exception {
+        var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx);
+
+        try {
+            runScalingLogic(ctx, autoscalerMetrics);
+        } catch (Throwable e) {
+            onError(ctx, autoscalerMetrics, e);
+        } finally {
+            applyParallelismOverrides(ctx);
+        }
+    }
+
+    @Override
+    public void cleanup(KEY jobKey) {
+        LOG.info("Cleaning up autoscaling meta data");
+        metricsCollector.cleanup(jobKey);
+        lastEvaluatedMetrics.remove(jobKey);
+        flinkMetrics.remove(jobKey);
+        stateStore.removeInfoFromCache(jobKey);
+    }
+
+    private void clearParallelismOverrides(Context ctx) throws Exception {
+        var parallelismOverrides = stateStore.getParallelismOverrides(ctx);
+        if (parallelismOverrides.isPresent()) {
+            stateStore.removeParallelismOverrides(ctx);
+            stateStore.flush(ctx);
+        }
+    }
+
+    @VisibleForTesting
+    protected Optional<Map<String, String>> getParallelismOverrides(Context 
ctx) throws Exception {
+        return stateStore.getParallelismOverrides(ctx);
+    }
+
+    /**
+     * If there are any parallelism overrides by the {@link JobAutoScaler} 
apply them to the
+     * scalingRealizer.
+     *
+     * @param ctx Job context
+     */
+    @VisibleForTesting
+    protected void applyParallelismOverrides(Context ctx) throws Exception {
+        var overridesOpt = getParallelismOverrides(ctx);
+        if (overridesOpt.isEmpty() || overridesOpt.get().isEmpty()) {
+            return;
+        }
+        Map<String, String> overrides = overridesOpt.get();
+        LOG.debug("Applying parallelism overrides: {}", overrides);
+
+        var conf = ctx.getConfiguration();
+        var userOverrides = new 
HashMap<>(conf.get(PipelineOptions.PARALLELISM_OVERRIDES));
+        var exclusions = conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
+
+        overrides.forEach(
+                (k, v) -> {
+                    // Respect user override for excluded vertices
+                    if (exclusions.contains(k)) {
+                        userOverrides.putIfAbsent(k, v);
+                    } else {
+                        userOverrides.put(k, v);
+                    }
+                });
+        scalingRealizer.realize(ctx, userOverrides);
+    }
+
+    private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics 
autoscalerMetrics)
+            throws Exception {
+
+        var conf = ctx.getConfiguration();
+        if (!conf.getBoolean(AUTOSCALER_ENABLED)) {
+            LOG.debug("Autoscaler is disabled");
+            clearParallelismOverrides(ctx);
+            return;
+        }
+
+        var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore);
+
+        if (collectedMetrics.getMetricHistory().isEmpty()) {
+            stateStore.flush(ctx);
+            return;
+        }
+        LOG.debug("Collected metrics: {}", collectedMetrics);
+
+        var evaluatedMetrics = evaluator.evaluate(conf, collectedMetrics);
+        LOG.debug("Evaluated metrics: {}", evaluatedMetrics);
+        lastEvaluatedMetrics.put(ctx.getJobKey(), evaluatedMetrics);
+
+        initRecommendedParallelism(evaluatedMetrics);
+        autoscalerMetrics.registerScalingMetrics(
+                
collectedMetrics.getJobTopology().getVerticesInTopologicalOrder(),
+                () -> lastEvaluatedMetrics.get(ctx.getJobKey()));
+
+        if (!collectedMetrics.isFullyCollected()) {
+            // We have done an upfront evaluation, but we are not ready for 
scaling.
+            resetRecommendedParallelism(evaluatedMetrics);
+            stateStore.flush(ctx);
+            return;
+        }
+
+        var parallelismChanged = scalingExecutor.scaleResource(ctx, 
evaluatedMetrics);
+
+        if (parallelismChanged) {
+            autoscalerMetrics.incrementScaling();
+        } else {
+            autoscalerMetrics.incrementBalanced();
+        }
+
+        stateStore.flush(ctx);

Review Comment:
   Can we run the flush after calling this method right after line 85?



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Tests for {@link RestApiMetricsCollector}. */
+public class RestApiMetricsCollectorTest {
+
+    @Test
+    public void testAggregateMultiplePendingRecordsMetricsPerSource() throws 
Exception {
+        RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>> collector =
+                new RestApiMetricsCollector<>();
+
+        JobVertexID jobVertexID = new JobVertexID();
+        Map<String, FlinkMetric> flinkMetrics =
+                Map.of(
+                        "a.pendingRecords", FlinkMetric.PENDING_RECORDS,
+                        "b.pendingRecords", FlinkMetric.PENDING_RECORDS);
+        Map<JobVertexID, Map<String, FlinkMetric>> metrics = 
Map.of(jobVertexID, flinkMetrics);
+
+        List<AggregatedMetric> aggregatedMetricsResponse =
+                List.of(
+                        new AggregatedMetric(
+                                "a.pendingRecords", Double.NaN, Double.NaN, 
Double.NaN, 100.),
+                        new AggregatedMetric(
+                                "b.pendingRecords", Double.NaN, Double.NaN, 
Double.NaN, 100.),
+                        new AggregatedMetric(
+                                "c.unrelated", Double.NaN, Double.NaN, 
Double.NaN, 100.));
+
+        Configuration conf = new Configuration();
+        RestClusterClient<String> restClusterClient =
+                new RestClusterClient<>(
+                        conf,
+                        "test-cluster",
+                        (c, e) -> new StandaloneClientHAServices("localhost")) 
{
+                    @Override
+                    public <
+                                    M extends MessageHeaders<R, P, U>,
+                                    U extends MessageParameters,
+                                    R extends RequestBody,
+                                    P extends ResponseBody>
+                            CompletableFuture<P> sendRequest(
+                                    M messageHeaders, U messageParameters, R 
request) {
+                        if (messageHeaders instanceof 
AggregatedSubtaskMetricsHeaders) {
+                            return (CompletableFuture<P>)
+                                    CompletableFuture.completedFuture(
+                                            new AggregatedMetricsResponseBody(
+                                                    
aggregatedMetricsResponse));
+                        }
+                        return (CompletableFuture<P>)
+                                
CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+                    }
+                };
+
+        JobID jobID = new JobID();
+        JobAutoScalerContext<JobID> context =
+                new JobAutoScalerContext<>(
+                        jobID,
+                        jobID,
+                        conf,
+                        new UnregisteredMetricsGroup(),
+                        () -> restClusterClient);
+
+        Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> jobVertexIDMapMap 
=
+                collector.queryAllAggregatedMetrics(context, metrics);
+
+        System.out.println(jobVertexIDMapMap);

Review Comment:
   Println left-over?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import com.fasterxml.jackson.core.JacksonException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import lombok.SneakyThrows;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.LoaderOptions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.Base64;
+import java.util.Map;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+/** The kubernetes autoscaler state store, it's based on the config map. */

Review Comment:
   ```suggestion
   /** An AutoscalerStateStore which persists its state in Kubernetes 
ConfigMaps. */
   ```



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+
+import javax.annotation.Nullable;
+
+/** The kubernetes autoscaler event handler, it's based on the {@link 
EventRecorder}. */

Review Comment:
   ```suggestion
   /** An event handler which posts events to the Kubernetes events API. */
   ```



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/EventCollector.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The event handler with collect events.
+ *
+ * @param <KEY>
+ * @param <Context>
+ */

Review Comment:
   ```suggestion
   /** The event handler for collecting events */
   ```



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java:
##########
@@ -47,6 +50,36 @@ public abstract class FlinkResourceContext<CR extends 
AbstractFlinkResource<?, ?
     private FlinkOperatorConfiguration operatorConfig;
     private Configuration observeConfig;
     private FlinkService flinkService;
+    private KubernetesJobAutoScalerContext autoScalerContext;
+
+    /**
+     * Get or create the autoscaler context.
+     *
+     * @return autoScalerContext.
+     */

Review Comment:
   ```suggestion
   ```



##########
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/state/FlushCountableAutoscalerStateStore.java:
##########
@@ -15,19 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.autoscaler.state;
 
-import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
 
-import io.fabric8.kubernetes.client.KubernetesClient;
-import lombok.SneakyThrows;
+/** The state store counts the flush. */
+public class FlushCountableAutoscalerStateStore<KEY, Context extends 
JobAutoScalerContext<KEY>>
+        extends InMemoryAutoScalerStateStore<KEY, Context> {
 
-/** Autoscaler test utilities. * */
-public class AutoscalerTestUtils {
+    private int flushCount = 0;

Review Comment:
   ```suggestion
       private int flushCount;
   ```
   
   Int fields are zero by default.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java:
##########
@@ -17,24 +17,26 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler;
 
-import 
org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler;
-import 
org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory;
-import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.PipelineOptions;
 
-import com.google.auto.service.AutoService;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+
+import java.util.Map;
+
+/** The kubernetes scaling realizer. */

Review Comment:
   ```suggestion
   /** The Kubernetes implementation for applying parallelism overrides. */
   ```



##########
flink-autoscaler/pom.xml:
##########
@@ -45,6 +45,32 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>${lombok.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- TODO FLINK-33098: These jackson dependencies can be replaced with 
flink shaded jackson. It can be done

Review Comment:
   Yep, let's use the version bundled with the client.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -174,6 +182,32 @@ public void reconcile(FlinkResourceContext<CR> ctx) throws 
Exception {
         }
     }
 
+    private void scaling(FlinkResourceContext<CR> ctx) throws Exception {
+        KubernetesJobAutoScalerContext autoScalerContext = 
ctx.getJobAutoScalerContext();
+
+        if (autoscalerDisabled(ctx)) {
+            autoScalerContext.getConfiguration().set(AUTOSCALER_ENABLED, 
false);
+            resourceScaler.scale(autoScalerContext);
+            return;
+        }
+        if (waitingForRunning(ctx.getResource().getStatus())) {
+            LOG.info("Autoscaler is waiting for  stable, running state");
+            resourceScaler.cleanup(autoScalerContext.getJobKey());
+            return;

Review Comment:
   We can internally reset the state, but the effective overrides on the 
deployment should not be cleared during the waiting period. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to