This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit b4e958c4700f4e1a1b38512fa09549bbcd78c95d
Author: 1996fanrui <[email protected]>
AuthorDate: Tue Sep 19 11:41:14 2023 +0800

    [FLINK-33098][autoscaler] Move non-kubernetes related autoscaler classes to 
flink-autoscaler module
---
 Dockerfile                                         |  6 ++-
 docker-entrypoint.sh                               |  2 +-
 flink-autoscaler/pom.xml                           | 26 ++++++++++
 .../flink}/autoscaler/ScalingMetricEvaluator.java  | 55 +++++++++++-----------
 .../apache/flink}/autoscaler/ScalingSummary.java   |  6 +--
 .../autoscaler/config/AutoScalerOptions.java       | 15 ++++--
 .../autoscaler/metrics/CollectedMetricHistory.java |  4 +-
 .../autoscaler/metrics/CollectedMetrics.java       |  2 +-
 .../org/apache/flink}/autoscaler/metrics/Edge.java |  2 +-
 .../autoscaler/metrics/EvaluatedScalingMetric.java |  2 +-
 .../flink}/autoscaler/metrics/FlinkMetric.java     |  2 +-
 .../autoscaler/metrics/MetricAggregator.java       |  2 +-
 .../flink}/autoscaler/metrics/ScalingMetric.java   |  2 +-
 .../flink}/autoscaler/metrics/ScalingMetrics.java  |  8 ++--
 .../flink}/autoscaler/topology/JobTopology.java    |  2 +-
 .../flink}/autoscaler/topology/VertexInfo.java     |  2 +-
 .../autoscaler/utils/AutoScalerSerDeModule.java    |  4 +-
 .../flink}/autoscaler/utils/AutoScalerUtils.java   | 12 ++---
 .../autoscaler/ScalingMetricEvaluatorTest.java     | 44 ++++++++---------
 .../autoscaler/metrics/ScalingMetricsTest.java     |  8 ++--
 .../autoscaler/topology}/JobTopologyTest.java      |  3 +-
 .../autoscaler/utils/AutoScalerUtilsTest.java      |  4 +-
 .../src/test/resources/log4j2-test.properties      | 35 ++++----------
 flink-kubernetes-docs/pom.xml                      |  2 +-
 .../configuration/ConfigOptionsDocGenerator.java   |  4 +-
 flink-kubernetes-operator-autoscaler/pom.xml       |  7 +++
 .../operator/autoscaler/AutoScalerInfo.java        |  7 +--
 .../autoscaler/AutoscalerFlinkMetrics.java         |  8 ++--
 .../operator/autoscaler/JobAutoScalerImpl.java     |  9 ++--
 .../autoscaler/JobAutoscalerFactoryImpl.java       |  1 +
 .../operator/autoscaler/JobVertexScaler.java       | 29 ++++++------
 .../autoscaler/RestApiMetricsCollector.java        |  2 +-
 .../operator/autoscaler/ScalingExecutor.java       | 33 ++++++-------
 .../autoscaler/ScalingMetricCollector.java         | 16 +++----
 .../autoscaler/AutoScalerFlinkMetricsTest.java     | 10 ++--
 .../operator/autoscaler/AutoScalerInfoTest.java    |  9 ++--
 .../autoscaler/BacklogBasedScalingTest.java        |  9 ++--
 .../operator/autoscaler/JobAutoScalerImplTest.java | 19 ++++----
 .../operator/autoscaler/JobVertexScalerTest.java   |  8 ++--
 .../MetricsCollectionAndEvaluationTest.java        | 15 +++---
 .../autoscaler/RecommendedParallelismTest.java     | 17 +++----
 .../autoscaler/RestApiMetricsCollectorTest.java    |  2 +-
 .../operator/autoscaler/ScalingExecutorTest.java   |  8 ++--
 .../autoscaler/ScalingMetricCollectorTest.java     |  9 ++--
 .../autoscaler/TestingMetricsCollector.java        |  4 +-
 45 files changed, 256 insertions(+), 220 deletions(-)

diff --git a/Dockerfile b/Dockerfile
index fe6a85a3..4cfb8f66 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -23,11 +23,12 @@ WORKDIR /app
 
 COPY . .
 
-RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl 
flink-kubernetes-standalone,flink-kubernetes-operator-api,flink-kubernetes-operator,flink-kubernetes-operator-autoscaler,flink-kubernetes-webhook
 -DskipTests=$SKIP_TESTS
+RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl 
flink-kubernetes-standalone,flink-kubernetes-operator-api,flink-kubernetes-operator,flink-autoscaler,flink-kubernetes-operator-autoscaler,flink-kubernetes-webhook
 -DskipTests=$SKIP_TESTS
 
 RUN cd /app/tools/license; mkdir jars; cd jars; \
     cp 
/app/flink-kubernetes-operator/target/flink-kubernetes-operator-*-shaded.jar . 
&& \
     cp 
/app/flink-kubernetes-operator-autoscaler/target/flink-kubernetes-operator-autoscaler-*.jar
 . && \
+    cp /app/flink-autoscaler/target/flink-autoscaler-*.jar . && \
     cp 
/app/flink-kubernetes-webhook/target/flink-kubernetes-webhook-*-shaded.jar . && 
\
     cp 
/app/flink-kubernetes-standalone/target/flink-kubernetes-standalone-*.jar . && \
     cp -r /app/flink-kubernetes-operator/target/plugins ./plugins && \
@@ -39,6 +40,7 @@ ENV FLINK_HOME=/opt/flink
 ENV FLINK_PLUGINS_DIR=$FLINK_HOME/plugins
 ENV OPERATOR_VERSION=1.7-SNAPSHOT
 ENV OPERATOR_JAR=flink-kubernetes-operator-$OPERATOR_VERSION-shaded.jar
+ENV FLINK_AUTOSCALER_JAR=flink-autoscaler-$OPERATOR_VERSION.jar
 ENV AUTOSCALER_JAR=flink-kubernetes-operator-autoscaler-$OPERATOR_VERSION.jar
 ENV WEBHOOK_JAR=flink-kubernetes-webhook-$OPERATOR_VERSION-shaded.jar
 ENV KUBERNETES_STANDALONE_JAR=flink-kubernetes-standalone-$OPERATOR_VERSION.jar
@@ -51,6 +53,7 @@ RUN groupadd --system --gid=9999 flink && \
     useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=flink flink
 
 COPY --from=build /app/flink-kubernetes-operator/target/$OPERATOR_JAR .
+COPY --from=build /app/flink-autoscaler/target/$FLINK_AUTOSCALER_JAR .
 COPY --from=build 
/app/flink-kubernetes-operator-autoscaler/target/$AUTOSCALER_JAR .
 COPY --from=build /app/flink-kubernetes-webhook/target/$WEBHOOK_JAR .
 COPY --from=build 
/app/flink-kubernetes-standalone/target/$KUBERNETES_STANDALONE_JAR .
@@ -61,6 +64,7 @@ COPY docker-entrypoint.sh /
 
 RUN chown -R flink:flink $FLINK_HOME && \
     chown flink:flink $OPERATOR_JAR && \
+    chown flink:flink $FLINK_AUTOSCALER_JAR && \
     chown flink:flink $AUTOSCALER_JAR && \
     chown flink:flink $WEBHOOK_JAR && \
     chown flink:flink $KUBERNETES_STANDALONE_JAR && \
diff --git a/docker-entrypoint.sh b/docker-entrypoint.sh
index 5b215bee..c46d672b 100755
--- a/docker-entrypoint.sh
+++ b/docker-entrypoint.sh
@@ -29,7 +29,7 @@ if [ "$1" = "help" ]; then
 elif [ "$1" = "operator" ]; then
     echo "Starting Operator"
 
-    exec java -cp 
"./$KUBERNETES_STANDALONE_JAR:./$OPERATOR_JAR:./$AUTOSCALER_JAR:$OPERATOR_LIB/*"
 $LOG_CONFIG $JVM_ARGS org.apache.flink.kubernetes.operator.FlinkOperator
+    exec java -cp 
"./$KUBERNETES_STANDALONE_JAR:./$OPERATOR_JAR:./$FLINK_AUTOSCALER_JAR:./$AUTOSCALER_JAR:$OPERATOR_LIB/*"
 $LOG_CONFIG $JVM_ARGS org.apache.flink.kubernetes.operator.FlinkOperator
 elif [ "$1" = "webhook" ]; then
     echo "Starting Webhook"
 
diff --git a/flink-autoscaler/pom.xml b/flink-autoscaler/pom.xml
index 709e5cca..8e216ee9 100644
--- a/flink-autoscaler/pom.xml
+++ b/flink-autoscaler/pom.xml
@@ -46,6 +46,32 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>${lombok.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- TODO FLINK-33098: These jackson dependencies can be replaced with 
flink shaded jackson. It can be done
+        after the flink-1.18 is released, because the shaded jackson version 
of flink-1.17 is 2.13.4,
+        it doesn't support loaderOptions. -->
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+
     </dependencies>
 
 </project>
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
similarity index 82%
rename from 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
rename to 
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
index af6d1732..e5f9e3cb 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
@@ -15,44 +15,45 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.autoscaler.metrics.Edge;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import org.apache.commons.math3.stat.StatUtils;
-import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedMap;
 
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LOAD;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 
 /** Job scaling evaluator for autoscaler. */
 public class ScalingMetricEvaluator {
@@ -110,7 +111,7 @@ public class ScalingMetricEvaluator {
                         });
     }
 
-    @NotNull
+    @Nonnull
     private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(
             Configuration conf,
             HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
scalingOutput,
@@ -154,7 +155,7 @@ public class ScalingMetricEvaluator {
     }
 
     @VisibleForTesting
-    protected static void computeProcessingRateThresholds(
+    public static void computeProcessingRateThresholds(
             Map<ScalingMetric, EvaluatedScalingMetric> metrics,
             Configuration conf,
             boolean processingBacklog) {
@@ -265,7 +266,7 @@ public class ScalingMetricEvaluator {
             Edge edge, SortedMap<Instant, CollectedMetrics> metricsHistory) {
         double[] metricValues =
                 metricsHistory.values().stream()
-                        .map(m -> m.getOutputRatios())
+                        .map(CollectedMetrics::getOutputRatios)
                         .filter(m -> m.containsKey(edge))
                         .mapToDouble(m -> m.get(edge))
                         .filter(d -> !Double.isNaN(d))
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingSummary.java
similarity index 89%
rename from 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
rename to 
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingSummary.java
index 4ff7a6e9..bc25f6c4 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingSummary.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.autoscaler;
 
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import lombok.Data;
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
similarity index 95%
rename from 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
rename to 
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index 276201dc..7a9d1d90 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -15,22 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.config;
+package org.apache.flink.autoscaler.config;
 
+import org.apache.flink.autoscaler.metrics.MetricAggregator;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.MetricAggregator;
 
 import java.time.Duration;
 import java.util.List;
 
-import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig;
-
 /** Config options related to the autoscaler module. */
 public class AutoScalerOptions {
 
+    public static final String DEPRECATED_K8S_OP_CONF_PREFIX = 
"kubernetes.operator.";
+    public static final String AUTOSCALER_CONF_PREFIX = "job.autoscaler.";
+
+    public static ConfigOptions.OptionBuilder operatorConfig(String key) {
+        return ConfigOptions.key(DEPRECATED_K8S_OP_CONF_PREFIX + key);
+    }
+
     private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
-        return operatorConfig("job.autoscaler." + key);
+        return operatorConfig(AUTOSCALER_CONF_PREFIX + key);
     }
 
     public static final ConfigOption<Boolean> AUTOSCALER_ENABLED =
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java
similarity index 88%
rename from 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java
rename to 
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java
index 43bbf47b..31a19879 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
 
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.JobTopology;
 
 import lombok.Data;
 import lombok.Setter;
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetrics.java
similarity index 94%
rename from 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
rename to 
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetrics.java
index 97e3f9fe..bfd85ca2 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetrics.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java
 b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/Edge.java
similarity index 94%
rename from 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java
rename to 
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/Edge.java
index c89692a9..1fe938b8 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/Edge.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java
similarity index 95%
rename from 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
rename to 
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java
index 23788886..c60adf18 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
 
 import lombok.Data;
 import lombok.NoArgsConstructor;
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
similarity index 97%
rename from 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java
rename to 
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
index 81609775..7f28d947 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
 
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricAggregator.java
similarity index 95%
rename from 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
rename to 
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricAggregator.java
index c79d4b7d..e147d211 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricAggregator.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
 
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
similarity index 97%
rename from 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
rename to 
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
index 608e4f1c..b6a3e17d 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
 
 /**
  * Supported scaling metrics. These represent high level metrics computed from 
Flink job metrics
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
similarity index 97%
rename from 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
rename to 
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
index 592d0602..afaa21ae 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
 
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
similarity index 99%
rename from 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java
rename to 
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
index 32643741..49fa11ba 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.topology;
+package org.apache.flink.autoscaler.topology;
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java
similarity index 95%
rename from 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java
rename to 
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java
index a261ca6d..249be598 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.topology;
+package org.apache.flink.autoscaler.topology;
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerSerDeModule.java
similarity index 95%
rename from 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java
rename to 
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerSerDeModule.java
index fad516a5..ce991403 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerSerDeModule.java
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.utils;
+package org.apache.flink.autoscaler.utils;
 
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge;
+import org.apache.flink.autoscaler.metrics.Edge;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import com.fasterxml.jackson.core.JsonGenerator;
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java
similarity index 87%
rename from 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java
rename to 
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java
index 54b6bef3..f011fcec 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.utils;
+package org.apache.flink.autoscaler.utils;
 
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import java.util.ArrayList;
@@ -30,8 +30,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
 
 /** AutoScaler utilities. */
 public class AutoScalerUtils {
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
similarity index 86%
rename from 
flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
rename to 
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
index c67b27fe..13b19445 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
@@ -15,18 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.autoscaler;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.autoscaler.metrics.Edge;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
 import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import org.junit.jupiter.api.Test;
@@ -39,19 +39,19 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.CATCH_UP_DURATION;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.RESTART_TIME;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LOAD;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.CATCH_UP_DURATION;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.RESTART_TIME;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
similarity index 98%
rename from 
flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
rename to 
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
index 0ae3e2b7..85a72e5a 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+package org.apache.flink.autoscaler.metrics;
 
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
 import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/topology/JobTopologyTest.java
similarity index 96%
rename from 
flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java
rename to 
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/topology/JobTopologyTest.java
index f76fadce..f16a2711 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/topology/JobTopologyTest.java
@@ -15,9 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler;
+package org.apache.flink.autoscaler.topology;
 
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtilsTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/AutoScalerUtilsTest.java
similarity index 94%
rename from 
flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtilsTest.java
rename to 
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/AutoScalerUtilsTest.java
index 90588994..a037258a 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtilsTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/AutoScalerUtilsTest.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.autoscaler.utils;
+package org.apache.flink.autoscaler.utils;
 
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import org.junit.jupiter.api.Test;
diff --git a/docker-entrypoint.sh 
b/flink-autoscaler/src/test/resources/log4j2-test.properties
old mode 100755
new mode 100644
similarity index 51%
copy from docker-entrypoint.sh
copy to flink-autoscaler/src/test/resources/log4j2-test.properties
index 5b215bee..47b66644
--- a/docker-entrypoint.sh
+++ b/flink-autoscaler/src/test/resources/log4j2-test.properties
@@ -1,6 +1,4 @@
-#!/usr/bin/env bash
-
-###############################################################################
+################################################################################
 #  Licensed to the Apache Software Foundation (ASF) under one
 #  or more contributor license agreements.  See the NOTICE file
 #  distributed with this work for additional information
@@ -16,28 +14,13 @@
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #  See the License for the specific language governing permissions and
 # limitations under the License.
-###############################################################################
-
-args=("$@")
-
-cd /flink-kubernetes-operator || exit
-
-if [ "$1" = "help" ]; then
-    printf "Usage: $(basename "$0") (operator|webhook)\n"
-    printf "    Or $(basename "$0") help\n\n"
-    exit 0
-elif [ "$1" = "operator" ]; then
-    echo "Starting Operator"
-
-    exec java -cp 
"./$KUBERNETES_STANDALONE_JAR:./$OPERATOR_JAR:./$AUTOSCALER_JAR:$OPERATOR_LIB/*"
 $LOG_CONFIG $JVM_ARGS org.apache.flink.kubernetes.operator.FlinkOperator
-elif [ "$1" = "webhook" ]; then
-    echo "Starting Webhook"
-
-    # Adds the operator shaded jar on the classpath when the webhook starts
-    exec java -cp 
"./$KUBERNETES_STANDALONE_JAR:./$OPERATOR_JAR:./$WEBHOOK_JAR:$OPERATOR_LIB/*" 
$LOG_CONFIG $JVM_ARGS 
org.apache.flink.kubernetes.operator.admission.FlinkOperatorWebhook
-fi
+################################################################################
 
-args=("${args[@]}")
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
 
-# Running command in pass-through mode
-exec "${args[@]}"
+# Log all infos to the console
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} 
%highlight{[%-5level]%notEmpty{[%X{resource.namespace}/}%notEmpty{%X{resource.name}]}
 %msg%n%throwable}
diff --git a/flink-kubernetes-docs/pom.xml b/flink-kubernetes-docs/pom.xml
index 09e8cb38..fd42c792 100644
--- a/flink-kubernetes-docs/pom.xml
+++ b/flink-kubernetes-docs/pom.xml
@@ -45,7 +45,7 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-kubernetes-operator-autoscaler</artifactId>
+            <artifactId>flink-autoscaler</artifactId>
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
diff --git 
a/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java
 
b/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java
index 2fb1e6a4..7f38f1fa 100644
--- 
a/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java
+++ 
b/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java
@@ -76,9 +76,7 @@ public class ConfigOptionsDocGenerator {
                 new OptionsClassLocation(
                         "flink-kubernetes-operator",
                         "org.apache.flink.kubernetes.operator.metrics"),
-                new OptionsClassLocation(
-                        "flink-kubernetes-operator-autoscaler",
-                        
"org.apache.flink.kubernetes.operator.autoscaler.config")
+                new OptionsClassLocation("flink-autoscaler", 
"org.apache.flink.autoscaler.config")
             };
     static final String DEFAULT_PATH_PREFIX = "src/main/java";
 
diff --git a/flink-kubernetes-operator-autoscaler/pom.xml 
b/flink-kubernetes-operator-autoscaler/pom.xml
index 1156855b..2a6498ca 100644
--- a/flink-kubernetes-operator-autoscaler/pom.xml
+++ b/flink-kubernetes-operator-autoscaler/pom.xml
@@ -36,6 +36,13 @@ under the License.
     </properties>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-autoscaler</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <dependency>
             <groupId>io.fabric8</groupId>
             <artifactId>kubernetes-client</artifactId>
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
index 55c0b0ac..c37b299f 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
@@ -18,11 +18,12 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
-import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
-import 
org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerSerDeModule;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.Preconditions;
 
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
index 56768e18..eab0f850 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
@@ -18,8 +18,8 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -32,8 +32,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.function.Supplier;
 
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
 
 /** Autoscaler metrics for observability. */
 public class AutoscalerFlinkMetrics {
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
index 402f88bf..87603fce 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
@@ -19,15 +19,16 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import 
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
 import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
-import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -41,9 +42,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.initRecommendedParallelism;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.resetRecommendedParallelism;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
 
 /** Application and SessionJob autoscaler. */
 public class JobAutoScalerImpl implements JobAutoScaler {
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java
index f6a7afc8..e0934a30 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler;
 
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
index fec126af..2fdc3e88 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
@@ -18,12 +18,13 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
-import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.Preconditions;
@@ -38,16 +39,16 @@ import java.time.ZoneId;
 import java.util.Map;
 import java.util.SortedMap;
 
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 
 /** Component responsible for computing vertex parallelism based on the 
scaling metrics. */
 public class JobVertexScaler {
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
index 158bb5a2..dca52a0c 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java
@@ -18,9 +18,9 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
index 6367381c..32859a43 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
@@ -18,11 +18,12 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
-import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.Preconditions;
@@ -39,12 +40,12 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedMap;
 
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 
 /** Class responsible for executing scaling decisions. */
 public class ScalingExecutor {
@@ -128,13 +129,13 @@ public class ScalingExecutor {
             Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
evaluatedMetrics,
             Map<JobVertexID, ScalingSummary> scalingSummaries) {
         scalingSummaries.forEach(
-                (jobVertexID, scalingSummary) -> {
-                    evaluatedMetrics
-                            .get(jobVertexID)
-                            .put(
-                                    ScalingMetric.RECOMMENDED_PARALLELISM,
-                                    
EvaluatedScalingMetric.of(scalingSummary.getNewParallelism()));
-                });
+                (jobVertexID, scalingSummary) ->
+                        evaluatedMetrics
+                                .get(jobVertexID)
+                                .put(
+                                        ScalingMetric.RECOMMENDED_PARALLELISM,
+                                        EvaluatedScalingMetric.of(
+                                                
scalingSummary.getNewParallelism())));
     }
 
     private static String scalingReport(
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index 3ad1db39..e620fb0e 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -19,17 +19,17 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetrics;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
-import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetrics;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerFlinkMetricsTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerFlinkMetricsTest.java
index 2600bd60..22aa1462 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerFlinkMetricsTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerFlinkMetricsTest.java
@@ -17,10 +17,10 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler;
 
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.metrics.TestingMetricListener;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
@@ -32,14 +32,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.AVERAGE;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.CURRENT;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.JOB_VERTEX_ID;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.initRecommendedParallelism;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.resetRecommendedParallelism;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /** {@link AutoscalerFlinkMetrics} tests. */
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
index cfc2e7f5..94ada9fb 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
@@ -17,11 +17,12 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler;
 
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index 31727983..9bdf777c 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -19,14 +19,15 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.OperatorTestBase;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventCollector;
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
index 9a0c8c44..7316668b 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java
@@ -19,16 +19,17 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.kubernetes.operator.OperatorTestBase;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
@@ -54,8 +55,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -87,7 +88,7 @@ public class JobAutoScalerImplTest extends OperatorTestBase {
     }
 
     @Test
-    void testMetricReporting() throws Exception {
+    void testMetricReporting() {
         JobVertexID jobVertexID = new JobVertexID();
         JobTopology jobTopology = new JobTopology(new VertexInfo(jobVertexID, 
Set.of(), 1, 10));
 
@@ -136,7 +137,7 @@ public class JobAutoScalerImplTest extends OperatorTestBase 
{
     }
 
     @Test
-    void testErrorReporting() throws Exception {
+    void testErrorReporting() {
         var autoscaler = new JobAutoScalerImpl(null, null, null, 
eventRecorder);
         FlinkResourceContext<FlinkDeployment> resourceContext = 
getResourceContext(app);
         ResourceID resourceId = ResourceID.fromResource(app);
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
index 8bff55be..a700f89e 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
@@ -17,12 +17,14 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler;
 
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.utils.EventCollector;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index a01195c2..f8cec403 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -19,17 +19,18 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventCollector;
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java
index b8d1528b..e11d76e9 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java
@@ -19,15 +19,16 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.OperatorTestBase;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventCollector;
@@ -52,9 +53,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerTestUtils.getOrCreateInfo;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -125,7 +126,7 @@ public class RecommendedParallelismTest extends 
OperatorTestBase {
     }
 
     @Test
-    public void endToEnd() throws Exception {
+    public void endToEnd() {
 
         // we start the autoscaler in advisor mode
         
app.getSpec().getFlinkConfiguration().put(AutoScalerOptions.SCALING_ENABLED.key(),
 "false");
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java
index 04d83107..9f3cb451 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java
@@ -18,11 +18,11 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
index c315300b..d66d10e8 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -18,13 +18,15 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.ScalingMetricEvaluator;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
-import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.utils.EventCollector;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java
index ddf26453..d6c8f14f 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java
@@ -19,13 +19,13 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.VertexInfo;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.JobPlanInfo;
@@ -124,8 +124,7 @@ public class ScalingMetricCollectorTest {
                 new RestApiMetricsCollector() {
                     @Override
                     protected Collection<AggregatedMetric> 
queryAggregatedMetricNames(
-                            RestClusterClient<?> restClient, JobID jobID, 
JobVertexID jobVertexID)
-                            throws Exception {
+                            RestClusterClient<?> restClient, JobID jobID, 
JobVertexID jobVertexID) {
                         return metricList;
                     }
                 };
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
index 9a3c1d13..277ee33b 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java
@@ -18,11 +18,11 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;

Reply via email to