1996fanrui commented on code in PR #928:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/928#discussion_r1909685576


##########
docs/content/docs/custom-resource/reference.md:
##########
@@ -182,6 +182,7 @@ This serves as a full reference for FlinkDeployment and 
FlinkSessionJob custom r
 | upgradeMode | org.apache.flink.kubernetes.operator.api.spec.UpgradeMode | 
Upgrade mode of the Flink job. |
 | allowNonRestoredState | java.lang.Boolean | Allow checkpoint state that 
cannot be mapped to any job vertex in tasks. |
 | savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full 
redeployment of the job from the savepoint path specified in 
initialSavepointPath. In order to trigger redeployment, change the number to a 
different non-null value. Rollback is not possible after redeployment. |
+| autoscalerResetNonce | java.lang.Long | Nonce used to reset the autoscaler 
metrics, parallelism overrides and history for the job. This can be used to 
quickly go back to the initial user-provided parallelism settings without 
having to toggle the autoscaler on and off. In order to trigger the reset 
behaviour simply change the nonce to a new non-null value. |

Review Comment:
   nits:
   
   > This can be used to quickly go back to the initial user-provided 
parallelism settings without having to toggle the autoscaler on and off.
   
   This sentence is comparing the new and old approach. For existing users, 
they understand this approach to reset parallelism. For new autoscaler users, 
this sentence may be not needed.
   
   Or if you think it's needed, would you mind moving it to the end of the 
paragraph?
   
   > In order to trigger the reset behaviour simply change the nonce to a new 
non-null value.
   
   How to update it to `In order to trigger the reset behaviour simply, change 
the nonce or number to a different non-null value.`? It's will unify the 
description of `savepointRedeployNonce` and `autoscalerResetNonce`.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -185,11 +185,26 @@ private Optional<String> 
getInitialSnapshotPath(AbstractFlinkSpec spec) {
 
     private void applyAutoscaler(FlinkResourceContext<CR> ctx) throws 
Exception {
         var autoScalerCtx = ctx.getJobAutoScalerContext();
+        var resource = ctx.getResource();
         boolean autoscalerEnabled =
-                ctx.getResource().getSpec().getJob() != null
+                resource.getSpec().getJob() != null
                         && 
ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED);
         autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, 
autoscalerEnabled);
 
+        var reconStatus = resource.getStatus().getReconciliationStatus();
+        if (!reconStatus.isBeforeFirstDeployment() && autoscalerEnabled) {
+            var newResetNonce = 
resource.getSpec().getJob().getAutoscalerResetNonce();
+            // check if the nonce changed to a non-null value
+            if (newResetNonce != null
+                    && !newResetNonce.equals(
+                            reconStatus
+                                    .deserializeLastReconciledSpec()
+                                    .getJob()
+                                    .getAutoscalerResetNonce())) {
+                autoscaler.cleanup(autoScalerCtx.getJobKey());

Review Comment:
   Thanks for the clarification, I created FLINK-37086 to follow the state leak 
for JDBC state store.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to