This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit b4e958c4700f4e1a1b38512fa09549bbcd78c95d Author: 1996fanrui <[email protected]> AuthorDate: Tue Sep 19 11:41:14 2023 +0800 [FLINK-33098][autoscaler] Move non-kubernetes related autoscaler classes to flink-autoscaler module --- Dockerfile | 6 ++- docker-entrypoint.sh | 2 +- flink-autoscaler/pom.xml | 26 ++++++++++ .../flink}/autoscaler/ScalingMetricEvaluator.java | 55 +++++++++++----------- .../apache/flink}/autoscaler/ScalingSummary.java | 6 +-- .../autoscaler/config/AutoScalerOptions.java | 15 ++++-- .../autoscaler/metrics/CollectedMetricHistory.java | 4 +- .../autoscaler/metrics/CollectedMetrics.java | 2 +- .../org/apache/flink}/autoscaler/metrics/Edge.java | 2 +- .../autoscaler/metrics/EvaluatedScalingMetric.java | 2 +- .../flink}/autoscaler/metrics/FlinkMetric.java | 2 +- .../autoscaler/metrics/MetricAggregator.java | 2 +- .../flink}/autoscaler/metrics/ScalingMetric.java | 2 +- .../flink}/autoscaler/metrics/ScalingMetrics.java | 8 ++-- .../flink}/autoscaler/topology/JobTopology.java | 2 +- .../flink}/autoscaler/topology/VertexInfo.java | 2 +- .../autoscaler/utils/AutoScalerSerDeModule.java | 4 +- .../flink}/autoscaler/utils/AutoScalerUtils.java | 12 ++--- .../autoscaler/ScalingMetricEvaluatorTest.java | 44 ++++++++--------- .../autoscaler/metrics/ScalingMetricsTest.java | 8 ++-- .../autoscaler/topology}/JobTopologyTest.java | 3 +- .../autoscaler/utils/AutoScalerUtilsTest.java | 4 +- .../src/test/resources/log4j2-test.properties | 35 ++++---------- flink-kubernetes-docs/pom.xml | 2 +- .../configuration/ConfigOptionsDocGenerator.java | 4 +- flink-kubernetes-operator-autoscaler/pom.xml | 7 +++ .../operator/autoscaler/AutoScalerInfo.java | 7 +-- .../autoscaler/AutoscalerFlinkMetrics.java | 8 ++-- .../operator/autoscaler/JobAutoScalerImpl.java | 9 ++-- .../autoscaler/JobAutoscalerFactoryImpl.java | 1 + .../operator/autoscaler/JobVertexScaler.java | 29 ++++++------ .../autoscaler/RestApiMetricsCollector.java | 2 +- .../operator/autoscaler/ScalingExecutor.java | 33 ++++++------- .../autoscaler/ScalingMetricCollector.java | 16 +++---- .../autoscaler/AutoScalerFlinkMetricsTest.java | 10 ++-- .../operator/autoscaler/AutoScalerInfoTest.java | 9 ++-- .../autoscaler/BacklogBasedScalingTest.java | 9 ++-- .../operator/autoscaler/JobAutoScalerImplTest.java | 19 ++++---- .../operator/autoscaler/JobVertexScalerTest.java | 8 ++-- .../MetricsCollectionAndEvaluationTest.java | 15 +++--- .../autoscaler/RecommendedParallelismTest.java | 17 +++---- .../autoscaler/RestApiMetricsCollectorTest.java | 2 +- .../operator/autoscaler/ScalingExecutorTest.java | 8 ++-- .../autoscaler/ScalingMetricCollectorTest.java | 9 ++-- .../autoscaler/TestingMetricsCollector.java | 4 +- 45 files changed, 256 insertions(+), 220 deletions(-) diff --git a/Dockerfile b/Dockerfile index fe6a85a3..4cfb8f66 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,11 +23,12 @@ WORKDIR /app COPY . . -RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl flink-kubernetes-standalone,flink-kubernetes-operator-api,flink-kubernetes-operator,flink-kubernetes-operator-autoscaler,flink-kubernetes-webhook -DskipTests=$SKIP_TESTS +RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl flink-kubernetes-standalone,flink-kubernetes-operator-api,flink-kubernetes-operator,flink-autoscaler,flink-kubernetes-operator-autoscaler,flink-kubernetes-webhook -DskipTests=$SKIP_TESTS RUN cd /app/tools/license; mkdir jars; cd jars; \ cp /app/flink-kubernetes-operator/target/flink-kubernetes-operator-*-shaded.jar . && \ cp /app/flink-kubernetes-operator-autoscaler/target/flink-kubernetes-operator-autoscaler-*.jar . && \ + cp /app/flink-autoscaler/target/flink-autoscaler-*.jar . && \ cp /app/flink-kubernetes-webhook/target/flink-kubernetes-webhook-*-shaded.jar . && \ cp /app/flink-kubernetes-standalone/target/flink-kubernetes-standalone-*.jar . && \ cp -r /app/flink-kubernetes-operator/target/plugins ./plugins && \ @@ -39,6 +40,7 @@ ENV FLINK_HOME=/opt/flink ENV FLINK_PLUGINS_DIR=$FLINK_HOME/plugins ENV OPERATOR_VERSION=1.7-SNAPSHOT ENV OPERATOR_JAR=flink-kubernetes-operator-$OPERATOR_VERSION-shaded.jar +ENV FLINK_AUTOSCALER_JAR=flink-autoscaler-$OPERATOR_VERSION.jar ENV AUTOSCALER_JAR=flink-kubernetes-operator-autoscaler-$OPERATOR_VERSION.jar ENV WEBHOOK_JAR=flink-kubernetes-webhook-$OPERATOR_VERSION-shaded.jar ENV KUBERNETES_STANDALONE_JAR=flink-kubernetes-standalone-$OPERATOR_VERSION.jar @@ -51,6 +53,7 @@ RUN groupadd --system --gid=9999 flink && \ useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=flink flink COPY --from=build /app/flink-kubernetes-operator/target/$OPERATOR_JAR . +COPY --from=build /app/flink-autoscaler/target/$FLINK_AUTOSCALER_JAR . COPY --from=build /app/flink-kubernetes-operator-autoscaler/target/$AUTOSCALER_JAR . COPY --from=build /app/flink-kubernetes-webhook/target/$WEBHOOK_JAR . COPY --from=build /app/flink-kubernetes-standalone/target/$KUBERNETES_STANDALONE_JAR . @@ -61,6 +64,7 @@ COPY docker-entrypoint.sh / RUN chown -R flink:flink $FLINK_HOME && \ chown flink:flink $OPERATOR_JAR && \ + chown flink:flink $FLINK_AUTOSCALER_JAR && \ chown flink:flink $AUTOSCALER_JAR && \ chown flink:flink $WEBHOOK_JAR && \ chown flink:flink $KUBERNETES_STANDALONE_JAR && \ diff --git a/docker-entrypoint.sh b/docker-entrypoint.sh index 5b215bee..c46d672b 100755 --- a/docker-entrypoint.sh +++ b/docker-entrypoint.sh @@ -29,7 +29,7 @@ if [ "$1" = "help" ]; then elif [ "$1" = "operator" ]; then echo "Starting Operator" - exec java -cp "./$KUBERNETES_STANDALONE_JAR:./$OPERATOR_JAR:./$AUTOSCALER_JAR:$OPERATOR_LIB/*" $LOG_CONFIG $JVM_ARGS org.apache.flink.kubernetes.operator.FlinkOperator + exec java -cp "./$KUBERNETES_STANDALONE_JAR:./$OPERATOR_JAR:./$FLINK_AUTOSCALER_JAR:./$AUTOSCALER_JAR:$OPERATOR_LIB/*" $LOG_CONFIG $JVM_ARGS org.apache.flink.kubernetes.operator.FlinkOperator elif [ "$1" = "webhook" ]; then echo "Starting Webhook" diff --git a/flink-autoscaler/pom.xml b/flink-autoscaler/pom.xml index 709e5cca..8e216ee9 100644 --- a/flink-autoscaler/pom.xml +++ b/flink-autoscaler/pom.xml @@ -46,6 +46,32 @@ under the License. <scope>provided</scope> </dependency> + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + <version>${lombok.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-params</artifactId> + <scope>test</scope> + </dependency> + + <!-- TODO FLINK-33098: These jackson dependencies can be replaced with flink shaded jackson. It can be done + after the flink-1.18 is released, because the shaded jackson version of flink-1.17 is 2.13.4, + it doesn't support loaderOptions. --> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + </dependencies> </project> diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java similarity index 82% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java index af6d1732..e5f9e3cb 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java @@ -15,44 +15,45 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler; +package org.apache.flink.autoscaler; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.CollectedMetricHistory; +import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.metrics.Edge; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.utils.AutoScalerUtils; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.commons.math3.stat.StatUtils; -import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.SortedMap; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LOAD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; /** Job scaling evaluator for autoscaler. */ public class ScalingMetricEvaluator { @@ -110,7 +111,7 @@ public class ScalingMetricEvaluator { }); } - @NotNull + @Nonnull private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics( Configuration conf, HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> scalingOutput, @@ -154,7 +155,7 @@ public class ScalingMetricEvaluator { } @VisibleForTesting - protected static void computeProcessingRateThresholds( + public static void computeProcessingRateThresholds( Map<ScalingMetric, EvaluatedScalingMetric> metrics, Configuration conf, boolean processingBacklog) { @@ -265,7 +266,7 @@ public class ScalingMetricEvaluator { Edge edge, SortedMap<Instant, CollectedMetrics> metricsHistory) { double[] metricValues = metricsHistory.values().stream() - .map(m -> m.getOutputRatios()) + .map(CollectedMetrics::getOutputRatios) .filter(m -> m.containsKey(edge)) .mapToDouble(m -> m.get(edge)) .filter(d -> !Double.isNaN(d)) diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingSummary.java similarity index 89% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingSummary.java index 4ff7a6e9..bc25f6c4 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingSummary.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler; +package org.apache.flink.autoscaler; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Data; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java similarity index 95% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java index 276201dc..7a9d1d90 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java @@ -15,22 +15,27 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.config; +package org.apache.flink.autoscaler.config; +import org.apache.flink.autoscaler.metrics.MetricAggregator; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.MetricAggregator; import java.time.Duration; import java.util.List; -import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig; - /** Config options related to the autoscaler module. */ public class AutoScalerOptions { + public static final String DEPRECATED_K8S_OP_CONF_PREFIX = "kubernetes.operator."; + public static final String AUTOSCALER_CONF_PREFIX = "job.autoscaler."; + + public static ConfigOptions.OptionBuilder operatorConfig(String key) { + return ConfigOptions.key(DEPRECATED_K8S_OP_CONF_PREFIX + key); + } + private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { - return operatorConfig("job.autoscaler." + key); + return operatorConfig(AUTOSCALER_CONF_PREFIX + key); } public static final ConfigOption<Boolean> AUTOSCALER_ENABLED = diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java similarity index 88% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java index 43bbf47b..31a19879 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java @@ -15,9 +15,9 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.JobTopology; import lombok.Data; import lombok.Setter; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetrics.java similarity index 94% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetrics.java index 97e3f9fe..bfd85ca2 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetrics.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/Edge.java similarity index 94% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/Edge.java index c89692a9..1fe938b8 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/Edge.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java similarity index 95% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java index 23788886..c60adf18 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; import lombok.Data; import lombok.NoArgsConstructor; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java similarity index 97% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java index 81609775..7f28d947 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricAggregator.java similarity index 95% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricAggregator.java index c79d4b7d..e147d211 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricAggregator.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java similarity index 97% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java index 608e4f1c..b6a3e17d 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; /** * Supported scaling metrics. These represent high level metrics computed from Flink job metrics diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java similarity index 97% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java index 592d0602..afaa21ae 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.utils.AutoScalerUtils; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java similarity index 99% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java index 32643741..49fa11ba 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.topology; +package org.apache.flink.autoscaler.topology; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java similarity index 95% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java index a261ca6d..249be598 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.topology; +package org.apache.flink.autoscaler.topology; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerSerDeModule.java similarity index 95% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerSerDeModule.java index fad516a5..ce991403 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerSerDeModule.java @@ -15,9 +15,9 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.utils; +package org.apache.flink.autoscaler.utils; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge; +import org.apache.flink.autoscaler.metrics.Edge; import org.apache.flink.runtime.jobgraph.JobVertexID; import com.fasterxml.jackson.core.JsonGenerator; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java similarity index 87% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java index 54b6bef3..f011fcec 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.utils; +package org.apache.flink.autoscaler.utils; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.runtime.jobgraph.JobVertexID; import java.util.ArrayList; @@ -30,8 +30,8 @@ import java.util.List; import java.util.Map; import java.util.Set; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; /** AutoScaler utilities. */ public class AutoScalerUtils { diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java similarity index 86% rename from flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java rename to flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java index c67b27fe..13b19445 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java @@ -15,18 +15,18 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler; +package org.apache.flink.autoscaler; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.CollectedMetricHistory; +import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.metrics.Edge; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.junit.jupiter.api.Test; @@ -39,19 +39,19 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.CATCH_UP_DURATION; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.RESTART_TIME; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LOAD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.CATCH_UP_DURATION; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.RESTART_TIME; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java similarity index 98% rename from flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java rename to flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java index 0ae3e2b7..85a72e5a 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/topology/JobTopologyTest.java similarity index 96% rename from flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java rename to flink-autoscaler/src/test/java/org/apache/flink/autoscaler/topology/JobTopologyTest.java index f76fadce..f16a2711 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/topology/JobTopologyTest.java @@ -15,9 +15,8 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler; +package org.apache.flink.autoscaler.topology; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtilsTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/AutoScalerUtilsTest.java similarity index 94% rename from flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtilsTest.java rename to flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/AutoScalerUtilsTest.java index 90588994..a037258a 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtilsTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/AutoScalerUtilsTest.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.utils; +package org.apache.flink.autoscaler.utils; +import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.junit.jupiter.api.Test; diff --git a/docker-entrypoint.sh b/flink-autoscaler/src/test/resources/log4j2-test.properties old mode 100755 new mode 100644 similarity index 51% copy from docker-entrypoint.sh copy to flink-autoscaler/src/test/resources/log4j2-test.properties index 5b215bee..47b66644 --- a/docker-entrypoint.sh +++ b/flink-autoscaler/src/test/resources/log4j2-test.properties @@ -1,6 +1,4 @@ -#!/usr/bin/env bash - -############################################################################### +################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -16,28 +14,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -############################################################################### - -args=("$@") - -cd /flink-kubernetes-operator || exit - -if [ "$1" = "help" ]; then - printf "Usage: $(basename "$0") (operator|webhook)\n" - printf " Or $(basename "$0") help\n\n" - exit 0 -elif [ "$1" = "operator" ]; then - echo "Starting Operator" - - exec java -cp "./$KUBERNETES_STANDALONE_JAR:./$OPERATOR_JAR:./$AUTOSCALER_JAR:$OPERATOR_LIB/*" $LOG_CONFIG $JVM_ARGS org.apache.flink.kubernetes.operator.FlinkOperator -elif [ "$1" = "webhook" ]; then - echo "Starting Webhook" - - # Adds the operator shaded jar on the classpath when the webhook starts - exec java -cp "./$KUBERNETES_STANDALONE_JAR:./$OPERATOR_JAR:./$WEBHOOK_JAR:$OPERATOR_LIB/*" $LOG_CONFIG $JVM_ARGS org.apache.flink.kubernetes.operator.admission.FlinkOperatorWebhook -fi +################################################################################ -args=("${args[@]}") +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender -# Running command in pass-through mode -exec "${args[@]}" +# Log all infos to the console +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %highlight{[%-5level]%notEmpty{[%X{resource.namespace}/}%notEmpty{%X{resource.name}]} %msg%n%throwable} diff --git a/flink-kubernetes-docs/pom.xml b/flink-kubernetes-docs/pom.xml index 09e8cb38..fd42c792 100644 --- a/flink-kubernetes-docs/pom.xml +++ b/flink-kubernetes-docs/pom.xml @@ -45,7 +45,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-kubernetes-operator-autoscaler</artifactId> + <artifactId>flink-autoscaler</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> diff --git a/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java b/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java index 2fb1e6a4..7f38f1fa 100644 --- a/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java +++ b/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java @@ -76,9 +76,7 @@ public class ConfigOptionsDocGenerator { new OptionsClassLocation( "flink-kubernetes-operator", "org.apache.flink.kubernetes.operator.metrics"), - new OptionsClassLocation( - "flink-kubernetes-operator-autoscaler", - "org.apache.flink.kubernetes.operator.autoscaler.config") + new OptionsClassLocation("flink-autoscaler", "org.apache.flink.autoscaler.config") }; static final String DEFAULT_PATH_PREFIX = "src/main/java"; diff --git a/flink-kubernetes-operator-autoscaler/pom.xml b/flink-kubernetes-operator-autoscaler/pom.xml index 1156855b..2a6498ca 100644 --- a/flink-kubernetes-operator-autoscaler/pom.xml +++ b/flink-kubernetes-operator-autoscaler/pom.xml @@ -36,6 +36,13 @@ under the License. </properties> <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-autoscaler</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> <groupId>io.fabric8</groupId> <artifactId>kubernetes-client</artifactId> diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java index 55c0b0ac..c37b299f 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java @@ -18,11 +18,12 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics; -import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerSerDeModule; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.Preconditions; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java index 56768e18..eab0f850 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java @@ -18,8 +18,8 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -32,8 +32,8 @@ import java.util.Map; import java.util.Optional; import java.util.function.Supplier; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM; /** Autoscaler metrics for observability. */ public class AutoscalerFlinkMetrics { diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java index 402f88bf..87603fce 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java @@ -19,15 +19,16 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState; import org.apache.flink.kubernetes.operator.api.status.CommonStatus; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler; import org.apache.flink.kubernetes.operator.utils.EventRecorder; @@ -41,9 +42,9 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; import static org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.initRecommendedParallelism; import static org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.resetRecommendedParallelism; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; /** Application and SessionJob autoscaler. */ public class JobAutoScalerImpl implements JobAutoScaler { diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java index f6a7afc8..e0934a30 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java @@ -17,6 +17,7 @@ package org.apache.flink.kubernetes.operator.autoscaler; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler; import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory; import org.apache.flink.kubernetes.operator.utils.EventRecorder; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java index fec126af..2fdc3e88 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java @@ -18,12 +18,13 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.utils.AutoScalerUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.Preconditions; @@ -38,16 +39,16 @@ import java.time.ZoneId; import java.util.Map; import java.util.SortedMap; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; /** Component responsible for computing vertex parallelism based on the scaling metrics. */ public class JobVertexScaler { diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java index 158bb5a2..dca52a0c 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java @@ -18,9 +18,9 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.metrics.FlinkMetric; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java index 6367381c..32859a43 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java @@ -18,11 +18,12 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.Preconditions; @@ -39,12 +40,12 @@ import java.util.HashMap; import java.util.Map; import java.util.SortedMap; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; /** Class responsible for executing scaling decisions. */ public class ScalingExecutor { @@ -128,13 +129,13 @@ public class ScalingExecutor { Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics, Map<JobVertexID, ScalingSummary> scalingSummaries) { scalingSummaries.forEach( - (jobVertexID, scalingSummary) -> { - evaluatedMetrics - .get(jobVertexID) - .put( - ScalingMetric.RECOMMENDED_PARALLELISM, - EvaluatedScalingMetric.of(scalingSummary.getNewParallelism())); - }); + (jobVertexID, scalingSummary) -> + evaluatedMetrics + .get(jobVertexID) + .put( + ScalingMetric.RECOMMENDED_PARALLELISM, + EvaluatedScalingMetric.of( + scalingSummary.getNewParallelism()))); } private static String scalingReport( diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java index 3ad1db39..e620fb0e 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java @@ -19,17 +19,17 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.CollectedMetricHistory; +import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetrics; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.utils.AutoScalerUtils; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetrics; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerFlinkMetricsTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerFlinkMetricsTest.java index 2600bd60..22aa1462 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerFlinkMetricsTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerFlinkMetricsTest.java @@ -17,10 +17,10 @@ package org.apache.flink.kubernetes.operator.autoscaler; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.TestUtils; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.metrics.TestingMetricListener; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -32,14 +32,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; import static org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.AVERAGE; import static org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.CURRENT; import static org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.JOB_VERTEX_ID; import static org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.initRecommendedParallelism; import static org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.resetRecommendedParallelism; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; import static org.junit.jupiter.api.Assertions.assertEquals; /** {@link AutoscalerFlinkMetrics} tests. */ diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java index cfc2e7f5..94ada9fb 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java @@ -17,11 +17,12 @@ package org.apache.flink.kubernetes.operator.autoscaler; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.runtime.jobgraph.JobVertexID; import com.fasterxml.jackson.core.JsonProcessingException; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java index 31727983..9bdf777c 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java @@ -19,14 +19,15 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.OperatorTestBase; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.EventCollector; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java index 9a0c8c44..7316668b 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java @@ -19,16 +19,17 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.kubernetes.operator.OperatorTestBase; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; @@ -54,8 +55,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -87,7 +88,7 @@ public class JobAutoScalerImplTest extends OperatorTestBase { } @Test - void testMetricReporting() throws Exception { + void testMetricReporting() { JobVertexID jobVertexID = new JobVertexID(); JobTopology jobTopology = new JobTopology(new VertexInfo(jobVertexID, Set.of(), 1, 10)); @@ -136,7 +137,7 @@ public class JobAutoScalerImplTest extends OperatorTestBase { } @Test - void testErrorReporting() throws Exception { + void testErrorReporting() { var autoscaler = new JobAutoScalerImpl(null, null, null, eventRecorder); FlinkResourceContext<FlinkDeployment> resourceContext = getResourceContext(app); ResourceID resourceId = ResourceID.fromResource(app); diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java index 8bff55be..a700f89e 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java @@ -17,12 +17,14 @@ package org.apache.flink.kubernetes.operator.autoscaler; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.utils.EventCollector; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java index a01195c2..f8cec403 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java @@ -19,17 +19,18 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.CollectedMetricHistory; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingFlinkService; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.EventCollector; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java index b8d1528b..e11d76e9 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java @@ -19,15 +19,16 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.OperatorTestBase; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.EventCollector; @@ -52,9 +53,9 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM; import static org.apache.flink.kubernetes.operator.autoscaler.AutoscalerTestUtils.getOrCreateInfo; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -125,7 +126,7 @@ public class RecommendedParallelismTest extends OperatorTestBase { } @Test - public void endToEnd() throws Exception { + public void endToEnd() { // we start the autoscaler in advisor mode app.getSpec().getFlinkConfiguration().put(AutoScalerOptions.SCALING_ENABLED.key(), "false"); diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java index 04d83107..9f3cb451 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java @@ -18,11 +18,11 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.metrics.FlinkMetric; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.operator.TestingFlinkService; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java index c315300b..d66d10e8 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java @@ -18,13 +18,15 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.utils.EventCollector; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java index ddf26453..d6c8f14f 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java @@ -19,13 +19,13 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingFlinkService; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.JobPlanInfo; @@ -124,8 +124,7 @@ public class ScalingMetricCollectorTest { new RestApiMetricsCollector() { @Override protected Collection<AggregatedMetric> queryAggregatedMetricNames( - RestClusterClient<?> restClient, JobID jobID, JobVertexID jobVertexID) - throws Exception { + RestClusterClient<?> restClient, JobID jobID, JobVertexID jobVertexID) { return metricList; } }; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java index 9a3c1d13..277ee33b 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java @@ -18,11 +18,11 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.autoscaler.topology.JobTopology; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
