This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 5c76d01c79a91a255b40e0f08d384d7b0326bea3
Author: 1996fanrui <[email protected]>
AuthorDate: Mon Sep 18 16:04:50 2023 +0800

    [FLINK-33097][autoscaler] Initialize the generic autoscaler module and 
interfaces
---
 flink-autoscaler/pom.xml                           | 51 +++++++++++++++
 .../org/apache/flink/autoscaler/JobAutoScaler.java | 35 ++++++++++
 .../flink/autoscaler/JobAutoScalerContext.java     | 62 ++++++++++++++++++
 .../autoscaler/event/AutoScalerEventHandler.java   | 55 ++++++++++++++++
 .../flink/autoscaler/realizer/ScalingRealizer.java | 36 +++++++++++
 .../autoscaler/state/AutoScalerStateStore.java     | 74 ++++++++++++++++++++++
 pom.xml                                            |  1 +
 7 files changed, 314 insertions(+)

diff --git a/flink-autoscaler/pom.xml b/flink-autoscaler/pom.xml
new file mode 100644
index 00000000..709e5cca
--- /dev/null
+++ b/flink-autoscaler/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-kubernetes-operator-parent</artifactId>
+        <version>1.7-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-autoscaler</artifactId>
+    <name>Flink Autoscaler</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+    </dependencies>
+
+</project>
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java
new file mode 100644
index 00000000..ff2b7331
--- /dev/null
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The generic Autoscaler.
+ *
+ * @param <KEY> The job key.
+ */
+@Internal
+public interface JobAutoScaler<KEY, Context extends JobAutoScalerContext<KEY>> 
{
+
+    /** Called as part of the reconciliation loop. */
+    void scale(Context context) throws Exception;
+
+    /** Called when the job is deleted. */
+    void cleanup(KEY key);
+}
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java
new file mode 100644
index 00000000..9b60291c
--- /dev/null
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.function.SupplierWithException;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.ToString;
+
+import javax.annotation.Nullable;
+
+/**
+ * The job autoscaler context, it includes all details related to the current 
job.
+ *
+ * @param <KEY> The job key.
+ */
+@Experimental
+@AllArgsConstructor
+@ToString
+public class JobAutoScalerContext<KEY> {
+
+    /** The identifier of each flink job. */
+    @Getter private final KEY jobKey;
+
+    /** The jobId and jobStatus can be null when the job isn't started. */
+    @Nullable @Getter private final JobID jobID;
+
+    @Nullable @Getter private final JobStatus jobStatus;
+
+    @Getter private final Configuration configuration;
+
+    @Getter private final MetricGroup metricGroup;
+
+    @ToString.Exclude
+    private final SupplierWithException<RestClusterClient<String>, Exception> 
restClientSupplier;
+
+    public RestClusterClient<String> getRestClusterClient() throws Exception {
+        return restClientSupplier.get();
+    }
+}
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
new file mode 100644
index 00000000..9fafc686
--- /dev/null
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+
+/**
+ * Handler all loggable events during scaling.
+ *
+ * @param <KEY> The job key.
+ * @param <Context> Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerEventHandler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
+
+    /**
+     * Handle the event.
+     *
+     * @param interval When interval is great than 0, events that repeat 
within the interval will be
+     *     ignored.
+     */
+    void handleEvent(
+            Context context,
+            Type type,
+            String reason,
+            String message,
+            @Nullable String messageKey,
+            @Nullable Duration interval);
+
+    /** The type of the events. */
+    enum Type {
+        Normal,
+        Warning
+    }
+}
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
new file mode 100644
index 00000000..36184dad
--- /dev/null
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.realizer;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+
+import java.util.Map;
+
+/**
+ * The Scaling Realizer is responsible for managing scaling actions.
+ *
+ * @param <KEY> The job key.
+ * @param <Context> Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface ScalingRealizer<KEY, Context extends 
JobAutoScalerContext<KEY>> {
+
+    /** Update job's parallelism to parallelismOverrides. */
+    void realize(Context context, Map<String, String> parallelismOverrides);
+}
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
new file mode 100644
index 00000000..4551ef54
--- /dev/null
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Optional;
+import java.util.SortedMap;
+
+/**
+ * The state store is responsible for storing all state during scaling.
+ *
+ * @param <KEY> The job key.
+ * @param <Context> Instance of JobAutoScalerContext.
+ */
+@Experimental
+public interface AutoScalerStateStore<KEY, Context extends 
JobAutoScalerContext<KEY>> {
+
+    void storeScalingHistory(
+            Context jobContext, Map<JobVertexID, SortedMap<Instant, 
ScalingSummary>> scalingHistory)
+            throws Exception;
+
+    Optional<Map<JobVertexID, SortedMap<Instant, ScalingSummary>>> 
getScalingHistory(
+            Context jobContext) throws Exception;
+
+    void removeScalingHistory(Context jobContext) throws Exception;
+
+    void storeEvaluatedMetrics(
+            Context jobContext, SortedMap<Instant, CollectedMetrics> 
evaluatedMetrics)
+            throws Exception;
+
+    Optional<SortedMap<Instant, CollectedMetrics>> getEvaluatedMetrics(Context 
jobContext)
+            throws Exception;
+
+    void removeEvaluatedMetrics(Context jobContext) throws Exception;
+
+    void storeParallelismOverrides(Context jobContext, Map<String, String> 
parallelismOverrides)
+            throws Exception;
+
+    Optional<Map<String, String>> getParallelismOverrides(Context jobContext) 
throws Exception;
+
+    void removeParallelismOverrides(Context jobContext) throws Exception;
+
+    /**
+     * Flushing is needed because we just save data in cache for all store 
methods. For less write
+     * operations, we flush the cached data to the physical storage only after 
all operations have
+     * been performed.
+     */
+    void flush(Context jobContext) throws Exception;
+
+    /** Clean up all information related to the current job. */
+    void removeInfoFromCache(KEY jobKey);
+}
diff --git a/pom.xml b/pom.xml
index d90c72de..f18a417e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,6 +57,7 @@ under the License.
         <module>flink-kubernetes-operator-api</module>
         <module>flink-kubernetes-webhook</module>
         <module>flink-kubernetes-docs</module>
+        <module>flink-autoscaler</module>
         <module>examples/flink-sql-runner-example</module>
         <module>examples/flink-beam-example</module>
         <module>examples/kubernetes-client-examples</module>

Reply via email to