This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new f5d9b191 [FLINK-34398] Consider FlinkSessionJob's flinkConfiguration overrides during session job spec validation f5d9b191 is described below commit f5d9b1917aec875bf04c35d6d3debd93f620c76d Author: Dominik Dębowczyk <dominik.debowc...@getindata.com> AuthorDate: Tue Feb 6 14:23:50 2024 +0100 [FLINK-34398] Consider FlinkSessionJob's flinkConfiguration overrides during session job spec validation --- .../operator/validation/DefaultValidator.java | 4 ++++ .../operator/validation/DefaultValidatorTest.java | 17 +++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java index 2ead0686..3c9035df 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java @@ -497,6 +497,10 @@ public class DefaultValidator implements FlinkResourceValidator { effectiveConfig.putAll(sessionCluster.getSpec().getFlinkConfiguration()); } + if (sessionJob.getSpec().getFlinkConfiguration() != null) { + effectiveConfig.putAll(sessionJob.getSpec().getFlinkConfiguration()); + } + return firstPresent( validateNotApplicationCluster(sessionCluster), validateSessionClusterId(sessionJob, sessionCluster), diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java index fc961bba..c924c71e 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java @@ -745,6 +745,23 @@ public class DefaultValidatorTest { }, flinkDeployment -> {}, "The deploymentName can't be changed"); + + testSessionJobValidateWithModifier( + sessionJob -> { + sessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT); + sessionJob + .getSpec() + .setFlinkConfiguration( + Map.of( + CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), + "test-savepoint-dir", + CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(), + "test-checkpoint-dir")); + }, + flinkDeployment -> { + flinkDeployment.getSpec().setFlinkConfiguration(Map.of()); + }, + null); } private void testSessionJobValidateWithModifier(