This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new f5d9b191 [FLINK-34398] Consider FlinkSessionJob's flinkConfiguration
overrides during session job spec validation
f5d9b191 is described below
commit f5d9b1917aec875bf04c35d6d3debd93f620c76d
Author: Dominik Dębowczyk <[email protected]>
AuthorDate: Tue Feb 6 14:23:50 2024 +0100
[FLINK-34398] Consider FlinkSessionJob's flinkConfiguration overrides
during session job spec validation
---
.../operator/validation/DefaultValidator.java | 4 ++++
.../operator/validation/DefaultValidatorTest.java | 17 +++++++++++++++++
2 files changed, 21 insertions(+)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index 2ead0686..3c9035df 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -497,6 +497,10 @@ public class DefaultValidator implements
FlinkResourceValidator {
effectiveConfig.putAll(sessionCluster.getSpec().getFlinkConfiguration());
}
+ if (sessionJob.getSpec().getFlinkConfiguration() != null) {
+
effectiveConfig.putAll(sessionJob.getSpec().getFlinkConfiguration());
+ }
+
return firstPresent(
validateNotApplicationCluster(sessionCluster),
validateSessionClusterId(sessionJob, sessionCluster),
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index fc961bba..c924c71e 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -745,6 +745,23 @@ public class DefaultValidatorTest {
},
flinkDeployment -> {},
"The deploymentName can't be changed");
+
+ testSessionJobValidateWithModifier(
+ sessionJob -> {
+
sessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+ sessionJob
+ .getSpec()
+ .setFlinkConfiguration(
+ Map.of(
+
CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
+ "test-savepoint-dir",
+
CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(),
+ "test-checkpoint-dir"));
+ },
+ flinkDeployment -> {
+ flinkDeployment.getSpec().setFlinkConfiguration(Map.of());
+ },
+ null);
}
private void testSessionJobValidateWithModifier(