mxm commented on code in PR #677:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/677#discussion_r1355175979


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+
+/**
+ * Handler all loggable events during scaling.
+ *
+ * @param <KEY> The job key.
+ * @param <Context> Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerEventHandler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
+
+    /**
+     * Handle the event.
+     *
+     * @param interval When interval is great than 0, events that repeat 
within the interval will be
+     *     ignored.
+     */
+    void handleEvent(
+            Context context,
+            Type type,
+            String reason,
+            String message,
+            @Nullable String messageKey,
+            @Nullable Duration interval);
+
+    /** The type of the events. */
+    enum Type {
+        Normal,
+        Error

Review Comment:
   Why not Normal, _Warning_, Error?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism;
+import static 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism;
+
+/** The default implementation of {@link JobAutoScaler}. */
+public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>>
+        implements JobAutoScaler<KEY, Context> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JobAutoScalerImpl.class);
+
+    @VisibleForTesting protected static final String AUTOSCALER_ERROR = 
"AutoscalerError";
+
+    private final ScalingMetricCollector<KEY, Context> metricsCollector;
+    private final ScalingMetricEvaluator evaluator;
+    private final ScalingExecutor<KEY, Context> scalingExecutor;
+    private final AutoScalerEventHandler<KEY, Context> eventHandler;
+    private final ScalingRealizer<KEY, Context> scalingRealizer;
+    private final AutoScalerStateStore<KEY, Context> stateStore;
+
+    @VisibleForTesting
+    final Map<KEY, Map<JobVertexID, Map<ScalingMetric, 
EvaluatedScalingMetric>>>
+            lastEvaluatedMetrics = new ConcurrentHashMap<>();
+
+    @VisibleForTesting
+    final Map<KEY, AutoscalerFlinkMetrics> flinkMetrics = new 
ConcurrentHashMap<>();
+
+    public JobAutoScalerImpl(
+            ScalingMetricCollector<KEY, Context> metricsCollector,
+            ScalingMetricEvaluator evaluator,
+            ScalingExecutor<KEY, Context> scalingExecutor,
+            AutoScalerEventHandler<KEY, Context> eventHandler,
+            ScalingRealizer<KEY, Context> scalingRealizer,
+            AutoScalerStateStore<KEY, Context> stateStore) {
+        this.metricsCollector = metricsCollector;
+        this.evaluator = evaluator;
+        this.scalingExecutor = scalingExecutor;
+        this.eventHandler = eventHandler;
+        this.scalingRealizer = scalingRealizer;
+        this.stateStore = stateStore;
+    }
+
+    @Override
+    public void scale(Context ctx) throws Exception {
+        var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx);
+
+        try {
+            if (!ctx.getConfiguration().getBoolean(AUTOSCALER_ENABLED)) {
+                LOG.debug("Autoscaler is disabled");
+                clearParallelismOverrides(ctx);
+                return;
+            }
+
+            if (ctx.getJobStatus() != JobStatus.RUNNING) {
+                lastEvaluatedMetrics.remove(ctx.getJobKey());
+                return;
+            }
+
+            runScalingLogic(ctx, autoscalerMetrics);
+            stateStore.flush(ctx);

Review Comment:
   This is not flushing anymore after line 88 and line 93 but it was before.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingHistoryUtils.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.metrics;
+
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.state.AutoScalerStateStore;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import javax.annotation.Nonnull;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/** The utils for scaling history. */
+public class ScalingHistoryUtils {
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void 
addToScalingHistoryAndStore(
+            AutoScalerStateStore<KEY, Context> stateStore,
+            Context context,
+            Instant now,
+            Map<JobVertexID, ScalingSummary> summaries)
+            throws Exception {
+        addToScalingHistoryAndStore(
+                stateStore,
+                context,
+                getTrimmedScalingHistory(stateStore, context, now),
+                now,
+                summaries);
+    }
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void 
addToScalingHistoryAndStore(
+            AutoScalerStateStore<KEY, Context> stateStore,
+            Context context,
+            Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
scalingHistory,
+            Instant now,
+            Map<JobVertexID, ScalingSummary> summaries)
+            throws Exception {
+
+        summaries.forEach(
+                (id, summary) ->
+                        scalingHistory.computeIfAbsent(id, j -> new 
TreeMap<>()).put(now, summary));
+        stateStore.storeScalingHistory(context, scalingHistory);
+    }
+
+    public static <KEY, Context extends JobAutoScalerContext<KEY>> void 
updateVertexList(
+            AutoScalerStateStore<KEY, Context> stateStore,
+            Context ctx,
+            Instant now,
+            Set<JobVertexID> vertexSet)
+            throws Exception {
+        Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
trimmedScalingHistory =
+                getTrimmedScalingHistory(stateStore, ctx, now);
+
+        if (trimmedScalingHistory.keySet().removeIf(v -> 
!vertexSet.contains(v))) {
+            stateStore.storeScalingHistory(ctx, trimmedScalingHistory);
+        }
+    }
+
+    @Nonnull
+    public static <KEY, Context extends JobAutoScalerContext<KEY>>
+            Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
getTrimmedScalingHistory(
+                    AutoScalerStateStore<KEY, Context> autoScalerStateStore,
+                    Context context,
+                    Instant now)
+                    throws Exception {
+        var conf = context.getConfiguration();
+        return autoScalerStateStore
+                .getScalingHistory(context)
+                .map(
+                        scalingHistory -> {
+                            var entryIt = scalingHistory.entrySet().iterator();
+                            while (entryIt.hasNext()) {
+                                var entry = entryIt.next();
+                                // Limit how long past scaling decisions are 
remembered
+                                entry.setValue(
+                                        entry.getValue()
+                                                .tailMap(
+                                                        now.minus(
+                                                                conf.get(
+                                                                        
AutoScalerOptions
+                                                                               
 .VERTEX_SCALING_HISTORY_AGE))));

Review Comment:
   I see! It's hard to tell what has been moved and what not. I was concerned 
that returning a submap here can cause issues. It's better to clear the part 
from the map we do not need anymore and return the full map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to