This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch decoupling-autoscaler-k8s-v2 in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 2fd674cb7c64ded9f43dd60319cc03b59350dc84 Author: 1996fanrui <[email protected]> AuthorDate: Thu Jul 20 18:58:53 2023 +0800 [FLIP-334] Move non-kubernetes related autoscaler classes to flink-autoscaler module --- flink-autoscaler/pom.xml | 52 +++++++++++++++++++++ .../flink}/autoscaler/AutoscalerFlinkMetrics.java | 24 +++++++--- .../flink}/autoscaler/ScalingMetricEvaluator.java | 53 +++++++++++----------- .../apache/flink}/autoscaler/ScalingSummary.java | 8 ++-- .../autoscaler/config/AutoScalerOptions.java | 10 ++-- .../autoscaler/metrics/CollectedMetricHistory.java | 4 +- .../autoscaler/metrics/CollectedMetrics.java | 2 +- .../org/apache/flink}/autoscaler/metrics/Edge.java | 2 +- .../autoscaler/metrics/EvaluatedScalingMetric.java | 2 +- .../flink}/autoscaler/metrics/FlinkMetric.java | 2 +- .../autoscaler/metrics/MetricAggregator.java | 2 +- .../flink}/autoscaler/metrics/ScalingMetric.java | 2 +- .../flink}/autoscaler/metrics/ScalingMetrics.java | 8 ++-- .../flink}/autoscaler/topology/JobTopology.java | 2 +- .../flink}/autoscaler/topology/VertexInfo.java | 2 +- .../autoscaler/utils/AutoScalerSerDeModule.java | 4 +- .../flink}/autoscaler/utils/AutoScalerUtils.java | 17 ++++--- .../autoscaler/ScalingMetricEvaluatorTest.java | 44 +++++++++--------- .../autoscaler/metrics/ScalingMetricsTest.java | 8 ++-- .../autoscaler/utils/AutoScalerUtilsTest.java | 4 +- .../src/test/resources/log4j2-test.properties | 26 +++++++++++ flink-kubernetes-operator-autoscaler/pom.xml | 6 +++ .../operator/autoscaler/AutoScalerInfo.java | 7 +-- .../operator/autoscaler/JobAutoScalerImpl.java | 18 ++++---- .../autoscaler/JobAutoscalerFactoryImpl.java | 1 + .../operator/autoscaler/JobVertexScaler.java | 29 ++++++------ .../autoscaler/RestApiMetricsCollector.java | 2 +- .../operator/autoscaler/ScalingExecutor.java | 19 ++++---- .../autoscaler/ScalingMetricCollector.java | 16 +++---- .../operator/autoscaler/AutoScalerInfoTest.java | 9 ++-- .../autoscaler/BacklogBasedScalingTest.java | 14 +++--- .../operator/autoscaler/JobAutoScalerImplTest.java | 12 +++-- .../operator/autoscaler/JobTopologyTest.java | 2 +- .../operator/autoscaler/JobVertexScalerTest.java | 8 ++-- .../MetricsCollectionAndEvaluationTest.java | 15 +++--- .../autoscaler/RecommendedParallelismTest.java | 15 +++--- .../autoscaler/RestApiMetricsCollectorTest.java | 2 +- .../operator/autoscaler/ScalingExecutorTest.java | 8 ++-- .../autoscaler/ScalingMetricCollectorTest.java | 6 +-- .../autoscaler/TestingMetricsCollector.java | 4 +- pom.xml | 1 + 41 files changed, 293 insertions(+), 179 deletions(-) diff --git a/flink-autoscaler/pom.xml b/flink-autoscaler/pom.xml new file mode 100644 index 00000000..458ba878 --- /dev/null +++ b/flink-autoscaler/pom.xml @@ -0,0 +1,52 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-kubernetes-operator-parent</artifactId> + <version>1.6-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-autoscaler</artifactId> + <name>Flink Autoscaler</name> + <packaging>jar</packaging> + + <properties> + <jackson.version>2.15.0</jackson.version> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <artifactId>jackson-dataformat-yaml</artifactId> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <version>${jackson.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + <version>${lombok.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-params</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> \ No newline at end of file diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/AutoscalerFlinkMetrics.java similarity index 90% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/AutoscalerFlinkMetrics.java index fab3cb32..1a0f0671 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/AutoscalerFlinkMetrics.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler; +package org.apache.flink.autoscaler; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -37,11 +37,11 @@ public class AutoscalerFlinkMetrics { private static final Logger LOG = LoggerFactory.getLogger(AutoscalerFlinkMetrics.class); - final Counter numScalings; + private final Counter numScalings; - final Counter numErrors; + private final Counter numErrors; - final Counter numBalanced; + private final Counter numBalanced; private final MetricGroup metricGroup; @@ -54,6 +54,18 @@ public class AutoscalerFlinkMetrics { this.metricGroup = metricGroup; } + public Counter getNumScalings() { + return numScalings; + } + + public Counter getNumErrors() { + return numErrors; + } + + public Counter getNumBalanced() { + return numBalanced; + } + public void registerScalingMetrics( Supplier<Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>> currentVertexMetrics) { diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java similarity index 82% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java index af6d1732..7cb0bf1c 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java @@ -15,44 +15,45 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler; +package org.apache.flink.autoscaler; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.CollectedMetricHistory; +import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.metrics.Edge; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.utils.AutoScalerUtils; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.commons.math3.stat.StatUtils; -import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.SortedMap; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LOAD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; /** Job scaling evaluator for autoscaler. */ public class ScalingMetricEvaluator { @@ -110,7 +111,7 @@ public class ScalingMetricEvaluator { }); } - @NotNull + @Nonnull private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics( Configuration conf, HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> scalingOutput, @@ -154,7 +155,7 @@ public class ScalingMetricEvaluator { } @VisibleForTesting - protected static void computeProcessingRateThresholds( + public static void computeProcessingRateThresholds( Map<ScalingMetric, EvaluatedScalingMetric> metrics, Configuration conf, boolean processingBacklog) { diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingSummary.java similarity index 85% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingSummary.java index 4ff7a6e9..50a3d17b 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingSummary.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler; +package org.apache.flink.autoscaler; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Data; @@ -26,7 +26,7 @@ import lombok.NoArgsConstructor; import java.util.Map; -/** Scaling summary returned by the {@link ScalingMetricEvaluator}. */ +/** Scaling summary returned by the {@link org.apache.flink.autoscaler.ScalingMetricEvaluator}. */ @Data @NoArgsConstructor public class ScalingSummary { diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java similarity index 96% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java index b31a9bec..68f64467 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java @@ -15,22 +15,22 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.config; +package org.apache.flink.autoscaler.config; +import org.apache.flink.autoscaler.metrics.MetricAggregator; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.MetricAggregator; import java.time.Duration; import java.util.List; -import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig; - /** Config options related to the autoscaler module. */ public class AutoScalerOptions { + public static final String K8S_OP_CONF_PREFIX = "kubernetes.operator."; + private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { - return operatorConfig("job.autoscaler." + key); + return ConfigOptions.key(K8S_OP_CONF_PREFIX + "job.autoscaler." + key); } public static final ConfigOption<Boolean> AUTOSCALER_ENABLED = diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java similarity index 88% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java index 43bbf47b..31a19879 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java @@ -15,9 +15,9 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.JobTopology; import lombok.Data; import lombok.Setter; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetrics.java similarity index 94% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetrics.java index 97e3f9fe..bfd85ca2 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetrics.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/Edge.java similarity index 94% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/Edge.java index c89692a9..1fe938b8 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/Edge.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java similarity index 95% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java index 23788886..c60adf18 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; import lombok.Data; import lombok.NoArgsConstructor; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java similarity index 97% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java index 81609775..7f28d947 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricAggregator.java similarity index 95% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricAggregator.java index c79d4b7d..e147d211 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricAggregator.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java similarity index 97% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java index 608e4f1c..b6a3e17d 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; /** * Supported scaling metrics. These represent high level metrics computed from Flink job metrics diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java similarity index 97% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java index 592d0602..afaa21ae 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.utils.AutoScalerUtils; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java similarity index 99% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java index 3f160193..1f2f5a63 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.topology; +package org.apache.flink.autoscaler.topology; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java similarity index 95% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java index a261ca6d..249be598 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.topology; +package org.apache.flink.autoscaler.topology; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerSerDeModule.java similarity index 95% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerSerDeModule.java index fad516a5..ce991403 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerSerDeModule.java @@ -15,9 +15,9 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.utils; +package org.apache.flink.autoscaler.utils; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge; +import org.apache.flink.autoscaler.metrics.Edge; import org.apache.flink.runtime.jobgraph.JobVertexID; import com.fasterxml.jackson.core.JsonGenerator; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java similarity index 82% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java index 54b6bef3..176ad582 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.utils; +package org.apache.flink.autoscaler.utils; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.runtime.jobgraph.JobVertexID; import java.util.ArrayList; @@ -30,9 +30,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; - /** AutoScaler utilities. */ public class AutoScalerUtils { @@ -45,7 +42,8 @@ public class AutoScalerUtils { // Target = Lag Catchup Rate + Restart Catchup Rate + Processing at utilization // Target = LAG/CATCH_UP + INPUT_RATE*RESTART/CATCH_UP + INPUT_RATE/TARGET_UTIL - double lagCatchupTargetRate = evaluatedMetrics.get(CATCH_UP_DATA_RATE).getCurrent(); + double lagCatchupTargetRate = + evaluatedMetrics.get(ScalingMetric.CATCH_UP_DATA_RATE).getCurrent(); if (Double.isNaN(lagCatchupTargetRate)) { return Double.NaN; } @@ -56,7 +54,8 @@ public class AutoScalerUtils { targetUtilization = Math.max(0., targetUtilization); targetUtilization = Math.min(1., targetUtilization); - double avgInputTargetRate = evaluatedMetrics.get(TARGET_DATA_RATE).getAverage(); + double avgInputTargetRate = + evaluatedMetrics.get(ScalingMetric.TARGET_DATA_RATE).getAverage(); if (Double.isNaN(avgInputTargetRate)) { return Double.NaN; } diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java similarity index 86% rename from flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java rename to flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java index c67b27fe..13b19445 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java @@ -15,18 +15,18 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler; +package org.apache.flink.autoscaler; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.CollectedMetricHistory; +import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.metrics.Edge; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.junit.jupiter.api.Test; @@ -39,19 +39,19 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.CATCH_UP_DURATION; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.RESTART_TIME; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LOAD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.CATCH_UP_DURATION; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.RESTART_TIME; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java similarity index 98% rename from flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java rename to flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java index 0ae3e2b7..85a72e5a 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtilsTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/AutoScalerUtilsTest.java similarity index 94% rename from flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtilsTest.java rename to flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/AutoScalerUtilsTest.java index 90588994..a037258a 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtilsTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/AutoScalerUtilsTest.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.utils; +package org.apache.flink.autoscaler.utils; +import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.junit.jupiter.api.Test; diff --git a/flink-autoscaler/src/test/resources/log4j2-test.properties b/flink-autoscaler/src/test/resources/log4j2-test.properties new file mode 100644 index 00000000..47b66644 --- /dev/null +++ b/flink-autoscaler/src/test/resources/log4j2-test.properties @@ -0,0 +1,26 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +# Log all infos to the console +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %highlight{[%-5level]%notEmpty{[%X{resource.namespace}/}%notEmpty{%X{resource.name}]} %msg%n%throwable} diff --git a/flink-kubernetes-operator-autoscaler/pom.xml b/flink-kubernetes-operator-autoscaler/pom.xml index 66307e8c..4b77ee05 100644 --- a/flink-kubernetes-operator-autoscaler/pom.xml +++ b/flink-kubernetes-operator-autoscaler/pom.xml @@ -36,6 +36,12 @@ under the License. </properties> <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-autoscaler</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>io.fabric8</groupId> <artifactId>kubernetes-client</artifactId> diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java index 55c0b0ac..c37b299f 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java @@ -18,11 +18,12 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics; -import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerSerDeModule; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.Preconditions; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java index fbefd15a..12904af4 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java @@ -19,10 +19,12 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.AutoscalerFlinkMetrics; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler; import org.apache.flink.kubernetes.operator.utils.EventRecorder; @@ -36,9 +38,9 @@ import org.slf4j.LoggerFactory; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM; /** Application and SessionJob autoscaler. */ public class JobAutoScalerImpl implements JobAutoScaler { @@ -158,16 +160,16 @@ public class JobAutoScalerImpl implements JobAutoScaler { scalingExecutor.scaleResource(resource, autoScalerInfo, conf, evaluatedMetrics); if (specAdjusted) { - flinkMetrics.numScalings.inc(); + flinkMetrics.getNumScalings().inc(); } else { - flinkMetrics.numBalanced.inc(); + flinkMetrics.getNumBalanced().inc(); } autoScalerInfo.replaceInKubernetes(kubernetesClient); return specAdjusted; } catch (Throwable e) { LOG.error("Error while scaling resource", e); - flinkMetrics.numErrors.inc(); + flinkMetrics.getNumErrors().inc(); eventRecorder.triggerEvent( resource, EventRecorder.Type.Warning, diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java index 277796e5..5ebe9670 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java @@ -17,6 +17,7 @@ package org.apache.flink.kubernetes.operator.autoscaler; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler; import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory; import org.apache.flink.kubernetes.operator.utils.EventRecorder; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java index 497bb270..d5351fd3 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java @@ -18,12 +18,13 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.utils.AutoScalerUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.Preconditions; @@ -37,16 +38,16 @@ import java.time.ZoneId; import java.util.Map; import java.util.SortedMap; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; /** Component responsible for computing vertex parallelism based on the scaling metrics. */ public class JobVertexScaler { diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java index 158bb5a2..dca52a0c 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java @@ -18,9 +18,9 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.metrics.FlinkMetric; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java index 8ede4d99..f2ec20d6 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java @@ -18,11 +18,12 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.Preconditions; @@ -38,12 +39,12 @@ import java.util.HashMap; import java.util.Map; import java.util.SortedMap; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; /** Class responsible for executing scaling decisions. */ public class ScalingExecutor { diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java index 3ad1db39..e620fb0e 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java @@ -19,17 +19,17 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.CollectedMetricHistory; +import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetrics; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.utils.AutoScalerUtils; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetrics; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java index cfc2e7f5..94ada9fb 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java @@ -17,11 +17,12 @@ package org.apache.flink.kubernetes.operator.autoscaler; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.runtime.jobgraph.JobVertexID; import com.fasterxml.jackson.core.JsonProcessingException; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java index 2c76ae7d..c9e67b71 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java @@ -19,14 +19,16 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.AutoscalerFlinkMetrics; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.OperatorTestBase; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.EventCollector; @@ -441,7 +443,7 @@ public class BacklogBasedScalingTest extends OperatorTestBase { AutoscalerFlinkMetrics autoscalerFlinkMetrics = autoscaler.flinkMetrics.get( ResourceID.fromResource(getResourceContext(app, ctx).getResource())); - assertEquals(scalingCount, autoscalerFlinkMetrics.numScalings.getCount()); - assertEquals(balancedCount, autoscalerFlinkMetrics.numBalanced.getCount()); + assertEquals(scalingCount, autoscalerFlinkMetrics.getNumScalings().getCount()); + assertEquals(balancedCount, autoscalerFlinkMetrics.getNumBalanced().getCount()); } } diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java index bb46f610..f1749ad9 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java @@ -38,8 +38,8 @@ import org.junit.jupiter.api.Test; import java.util.Map; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -76,12 +76,14 @@ public class JobAutoScalerImplTest extends OperatorTestBase { ResourceID resourceId = ResourceID.fromResource(app); autoscaler.scale(resourceContext); - Assertions.assertEquals(1, autoscaler.flinkMetrics.get(resourceId).numErrors.getCount()); + Assertions.assertEquals( + 1, autoscaler.flinkMetrics.get(resourceId).getNumErrors().getCount()); autoscaler.scale(resourceContext); - Assertions.assertEquals(2, autoscaler.flinkMetrics.get(resourceId).numErrors.getCount()); + Assertions.assertEquals( + 2, autoscaler.flinkMetrics.get(resourceId).getNumErrors().getCount()); - assertEquals(0, autoscaler.flinkMetrics.get(resourceId).numScalings.getCount()); + assertEquals(0, autoscaler.flinkMetrics.get(resourceId).getNumScalings().getCount()); } @Test diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java index f76fadce..bd64be28 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java @@ -17,7 +17,7 @@ package org.apache.flink.kubernetes.operator.autoscaler; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.JobTopology; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java index fb4655df..82eb2e45 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java @@ -17,12 +17,14 @@ package org.apache.flink.kubernetes.operator.autoscaler; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.utils.EventCollector; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java index 0d973b00..f7dbb723 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java @@ -19,17 +19,18 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.CollectedMetricHistory; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingFlinkService; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.EventCollector; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java index 23ee4055..5246eefe 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java @@ -19,15 +19,16 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.OperatorTestBase; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.EventCollector; @@ -52,9 +53,9 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM; import static org.apache.flink.kubernetes.operator.autoscaler.AutoscalerTestUtils.getOrCreateInfo; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java index 04d83107..9f3cb451 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java @@ -18,11 +18,11 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.metrics.FlinkMetric; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.operator.TestingFlinkService; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java index 2191f723..a5ada0c4 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java @@ -18,13 +18,15 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.utils.EventCollector; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java index ddf26453..3c11a7f6 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java @@ -19,13 +19,13 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingFlinkService; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.JobPlanInfo; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java index 9a3c1d13..277ee33b 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java @@ -18,11 +18,11 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.autoscaler.topology.JobTopology; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; diff --git a/pom.xml b/pom.xml index 330d1d7e..4ba9a6de 100644 --- a/pom.xml +++ b/pom.xml @@ -57,6 +57,7 @@ under the License. <module>flink-kubernetes-operator-api</module> <module>flink-kubernetes-webhook</module> <module>flink-kubernetes-docs</module> + <module>flink-autoscaler</module> <module>examples/flink-sql-runner-example</module> <module>examples/flink-beam-example</module> <module>examples/kubernetes-client-examples</module>
