This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 5c76d01c79a91a255b40e0f08d384d7b0326bea3 Author: 1996fanrui <[email protected]> AuthorDate: Mon Sep 18 16:04:50 2023 +0800 [FLINK-33097][autoscaler] Initialize the generic autoscaler module and interfaces --- flink-autoscaler/pom.xml | 51 +++++++++++++++ .../org/apache/flink/autoscaler/JobAutoScaler.java | 35 ++++++++++ .../flink/autoscaler/JobAutoScalerContext.java | 62 ++++++++++++++++++ .../autoscaler/event/AutoScalerEventHandler.java | 55 ++++++++++++++++ .../flink/autoscaler/realizer/ScalingRealizer.java | 36 +++++++++++ .../autoscaler/state/AutoScalerStateStore.java | 74 ++++++++++++++++++++++ pom.xml | 1 + 7 files changed, 314 insertions(+) diff --git a/flink-autoscaler/pom.xml b/flink-autoscaler/pom.xml new file mode 100644 index 00000000..709e5cca --- /dev/null +++ b/flink-autoscaler/pom.xml @@ -0,0 +1,51 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-kubernetes-operator-parent</artifactId> + <version>1.7-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-autoscaler</artifactId> + <name>Flink Autoscaler</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + </dependencies> + +</project> diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java new file mode 100644 index 00000000..ff2b7331 --- /dev/null +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.Internal; + +/** + * The generic Autoscaler. + * + * @param <KEY> The job key. + */ +@Internal +public interface JobAutoScaler<KEY, Context extends JobAutoScalerContext<KEY>> { + + /** Called as part of the reconciliation loop. */ + void scale(Context context) throws Exception; + + /** Called when the job is deleted. */ + void cleanup(KEY key); +} diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java new file mode 100644 index 00000000..9b60291c --- /dev/null +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerContext.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.function.SupplierWithException; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.ToString; + +import javax.annotation.Nullable; + +/** + * The job autoscaler context, it includes all details related to the current job. + * + * @param <KEY> The job key. + */ +@Experimental +@AllArgsConstructor +@ToString +public class JobAutoScalerContext<KEY> { + + /** The identifier of each flink job. */ + @Getter private final KEY jobKey; + + /** The jobId and jobStatus can be null when the job isn't started. */ + @Nullable @Getter private final JobID jobID; + + @Nullable @Getter private final JobStatus jobStatus; + + @Getter private final Configuration configuration; + + @Getter private final MetricGroup metricGroup; + + @ToString.Exclude + private final SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier; + + public RestClusterClient<String> getRestClusterClient() throws Exception { + return restClientSupplier.get(); + } +} diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java new file mode 100644 index 00000000..9fafc686 --- /dev/null +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.autoscaler.JobAutoScalerContext; + +import javax.annotation.Nullable; + +import java.time.Duration; + +/** + * Handler all loggable events during scaling. + * + * @param <KEY> The job key. + * @param <Context> Instance of JobAutoScalerContext. + */ +@Experimental +public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> { + + /** + * Handle the event. + * + * @param interval When interval is great than 0, events that repeat within the interval will be + * ignored. + */ + void handleEvent( + Context context, + Type type, + String reason, + String message, + @Nullable String messageKey, + @Nullable Duration interval); + + /** The type of the events. */ + enum Type { + Normal, + Warning + } +} diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java new file mode 100644 index 00000000..36184dad --- /dev/null +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.realizer; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.autoscaler.JobAutoScalerContext; + +import java.util.Map; + +/** + * The Scaling Realizer is responsible for managing scaling actions. + * + * @param <KEY> The job key. + * @param <Context> Instance of JobAutoScalerContext. + */ +@Experimental +public interface ScalingRealizer<KEY, Context extends JobAutoScalerContext<KEY>> { + + /** Update job's parallelism to parallelismOverrides. */ + void realize(Context context, Map<String, String> parallelismOverrides); +} diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java new file mode 100644 index 00000000..4551ef54 --- /dev/null +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.state; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.autoscaler.JobAutoScalerContext; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import java.time.Instant; +import java.util.Map; +import java.util.Optional; +import java.util.SortedMap; + +/** + * The state store is responsible for storing all state during scaling. + * + * @param <KEY> The job key. + * @param <Context> Instance of JobAutoScalerContext. + */ +@Experimental +public interface AutoScalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>> { + + void storeScalingHistory( + Context jobContext, Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) + throws Exception; + + Optional<Map<JobVertexID, SortedMap<Instant, ScalingSummary>>> getScalingHistory( + Context jobContext) throws Exception; + + void removeScalingHistory(Context jobContext) throws Exception; + + void storeEvaluatedMetrics( + Context jobContext, SortedMap<Instant, CollectedMetrics> evaluatedMetrics) + throws Exception; + + Optional<SortedMap<Instant, CollectedMetrics>> getEvaluatedMetrics(Context jobContext) + throws Exception; + + void removeEvaluatedMetrics(Context jobContext) throws Exception; + + void storeParallelismOverrides(Context jobContext, Map<String, String> parallelismOverrides) + throws Exception; + + Optional<Map<String, String>> getParallelismOverrides(Context jobContext) throws Exception; + + void removeParallelismOverrides(Context jobContext) throws Exception; + + /** + * Flushing is needed because we just save data in cache for all store methods. For less write + * operations, we flush the cached data to the physical storage only after all operations have + * been performed. + */ + void flush(Context jobContext) throws Exception; + + /** Clean up all information related to the current job. */ + void removeInfoFromCache(KEY jobKey); +} diff --git a/pom.xml b/pom.xml index d90c72de..f18a417e 100644 --- a/pom.xml +++ b/pom.xml @@ -57,6 +57,7 @@ under the License. <module>flink-kubernetes-operator-api</module> <module>flink-kubernetes-webhook</module> <module>flink-kubernetes-docs</module> + <module>flink-autoscaler</module> <module>examples/flink-sql-runner-example</module> <module>examples/flink-beam-example</module> <module>examples/kubernetes-client-examples</module>
