1996fanrui commented on code in PR #928:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/928#discussion_r1909685576
##########
docs/content/docs/custom-resource/reference.md:
##########
@@ -182,6 +182,7 @@ This serves as a full reference for FlinkDeployment and
FlinkSessionJob custom r
| upgradeMode | org.apache.flink.kubernetes.operator.api.spec.UpgradeMode |
Upgrade mode of the Flink job. |
| allowNonRestoredState | java.lang.Boolean | Allow checkpoint state that
cannot be mapped to any job vertex in tasks. |
| savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full
redeployment of the job from the savepoint path specified in
initialSavepointPath. In order to trigger redeployment, change the number to a
different non-null value. Rollback is not possible after redeployment. |
+| autoscalerResetNonce | java.lang.Long | Nonce used to reset the autoscaler
metrics, parallelism overrides and history for the job. This can be used to
quickly go back to the initial user-provided parallelism settings without
having to toggle the autoscaler on and off. In order to trigger the reset
behaviour simply change the nonce to a new non-null value. |
Review Comment:
nits:
> This can be used to quickly go back to the initial user-provided
parallelism settings without having to toggle the autoscaler on and off.
This sentence is comparing the new and old approach. For existing users,
they understand this approach to reset parallelism. For new autoscaler users,
this sentence may be not needed.
Or if you think it's needed, would you mind moving it to the end of the
paragraph?
> In order to trigger the reset behaviour simply change the nonce to a new
non-null value.
How to update it to `In order to trigger the reset behaviour simply, change
the nonce or number to a different non-null value.`? It's will unify the
description of `savepointRedeployNonce` and `autoscalerResetNonce`.
##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -185,11 +185,26 @@ private Optional<String>
getInitialSnapshotPath(AbstractFlinkSpec spec) {
private void applyAutoscaler(FlinkResourceContext<CR> ctx) throws
Exception {
var autoScalerCtx = ctx.getJobAutoScalerContext();
+ var resource = ctx.getResource();
boolean autoscalerEnabled =
- ctx.getResource().getSpec().getJob() != null
+ resource.getSpec().getJob() != null
&&
ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED);
autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED,
autoscalerEnabled);
+ var reconStatus = resource.getStatus().getReconciliationStatus();
+ if (!reconStatus.isBeforeFirstDeployment() && autoscalerEnabled) {
+ var newResetNonce =
resource.getSpec().getJob().getAutoscalerResetNonce();
+ // check if the nonce changed to a non-null value
+ if (newResetNonce != null
+ && !newResetNonce.equals(
+ reconStatus
+ .deserializeLastReconciledSpec()
+ .getJob()
+ .getAutoscalerResetNonce())) {
+ autoscaler.cleanup(autoScalerCtx.getJobKey());
Review Comment:
Thanks for the clarification, I created FLINK-37086 to follow the state leak
for JDBC state store.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]