mxm commented on code in PR #614:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/614#discussion_r1233570095


##########
.github/workflows/ci.yml:
##########
@@ -92,6 +95,18 @@ jobs:
         exclude:
           - namespace: default
             test: test_multi_sessionjob.sh
+          - namespace: default
+            test: test_autoscaler.sh
+          - mode: standalone
+            test: test_autoscaler.sh
+          - version: v1_13

Review Comment:
   Same here, drop support for 1.13?



##########
flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java:
##########
@@ -85,50 +82,6 @@ public void setup() {
         jobStatus.setState(JobStatus.RUNNING.name());
     }
 
-    @Test
-    public void testStabilizationPeriod() throws Exception {

Review Comment:
   Where is this now tested?



##########
.github/workflows/ci.yml:
##########
@@ -67,7 +67,7 @@ jobs:
     runs-on: ubuntu-latest
     strategy:
       matrix:
-        version: ["v1_17","v1_16","v1_15","v1_14","v1_13"]
+        version: ["v1_18","v1_17","v1_16","v1_15","v1_14","v1_13"]

Review Comment:
   Time to drop support for 1.13?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -150,6 +150,14 @@ public static String operatorConfigKey(String key) {
                     .defaultValue(false)
                     .withDescription("Whether to ignore pending savepoint 
during job upgrade.");
 
+    @Documentation.Section(SECTION_DYNAMIC)
+    public static final ConfigOption<Boolean> 
JOB_UPGRADE_INPLACE_SCALING_ENABLED =
+            operatorConfig("job.upgrade.inplace-scaling.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether to enable inplace scaling for Flink 1.18+ 
using the resource requirements API.");

Review Comment:
   "Falls back to full redeployment for Flink 1.17"



##########
e2e-tests/test_autoscaler.sh:
##########
@@ -0,0 +1,53 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
+source "${SCRIPT_DIR}/utils.sh"
+
+CLUSTER_ID="flink-autoscaler-e2e"
+APPLICATION_YAML="${SCRIPT_DIR}/data/autoscaler.yaml"
+APPLICATION_IDENTIFIER="flinkdep/$CLUSTER_ID"
+TIMEOUT=300
+
+on_exit cleanup_and_exit "$APPLICATION_YAML" $TIMEOUT $CLUSTER_ID
+
+retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1
+
+wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT
+jm_pod_name=$(get_jm_pod_name $CLUSTER_ID)
+
+wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || 
exit 1
+wait_for_status $APPLICATION_IDENTIFIER '.status.lifecycleState' STABLE 
${TIMEOUT} || exit 1
+wait_for_event FlinkDeployment $CLUSTER_ID .reason==\"ScalingReport\" 
${TIMEOUT} || exit 1
+wait_for_event FlinkDeployment $CLUSTER_ID .reason==\"SpecChanged\" ${TIMEOUT} 
|| exit 1
+
+FLINK_VERSION=$(get_flink_version $APPLICATION_IDENTIFIER)
+echo "Flink version: $FLINK_VERSION"
+if [ "$FLINK_VERSION" = "v1_17" ]; then
+  echo "Verifying full upgrade triggered"
+  wait_for_event FlinkDeployment $CLUSTER_ID .reason==\"Suspended\" ${TIMEOUT} 
|| exit 1
+else
+  echo "Verifying inplace scaling triggered"
+  wait_for_event FlinkDeployment $CLUSTER_ID .reason==\"Scaling\" ${TIMEOUT} 
|| exit 1
+fi
+wait_for_status $APPLICATION_IDENTIFIER '.status.lifecycleState' STABLE 
${TIMEOUT} || exit 1
+
+check_operator_log_for_errors || exit 1
+
+echo "Successfully run the autoscaler test"

Review Comment:
   This is pretty cool. Thanks 🙏



##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java:
##########
@@ -165,30 +159,6 @@ private static String scalingReport(
         return sb.toString();
     }
 
-    private boolean stabilizationPeriodPassed(
-            AbstractFlinkResource<?, ?> resource, Configuration conf) {
-        var jobStatus = resource.getStatus().getJobStatus();
-
-        if (!JobStatus.RUNNING.name().equals(jobStatus.getState())) {
-            // Never consider a non-running job stable
-            return false;
-        }
-
-        var startTs =
-                Instant.ofEpochMilli(
-                        // Use the update time which will reflect the latest 
job state update
-                        // Do not use the start time because it doesn't tell 
when the job went to
-                        // RUNNING
-                        Long.parseLong(jobStatus.getUpdateTime()));
-        var stableTime = startTs.plus(conf.get(STABILIZATION_INTERVAL));

Review Comment:
   Where is this now checked?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to