This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4920ab25b3 Remove unnecessary validation from cncf provider. (#39238)
4920ab25b3 is described below
commit 4920ab25b3062c04222823f3c47b8d4d8be7bd97
Author: V.Shkaberda <[email protected]>
AuthorDate: Thu Apr 25 10:16:22 2024 +0300
Remove unnecessary validation from cncf provider. (#39238)
Co-authored-by: Шкаберда Вадим Миколайович <[email protected]>
---
.../kubernetes/operators/custom_object_launcher.py | 3 +--
.../operators/test_custom_object_launcher.py | 28 ++++++++++++++--------
2 files changed, 19 insertions(+), 12 deletions(-)
diff --git
a/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py
b/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py
index 439c51e3cb..77d99a0fba 100644
--- a/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py
+++ b/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py
@@ -60,12 +60,11 @@ class SparkJobSpec:
if self.spec.get("dynamicAllocation", {}).get("enabled"):
if not all(
[
- self.spec["dynamicAllocation"].get("initialExecutors"),
self.spec["dynamicAllocation"].get("minExecutors"),
self.spec["dynamicAllocation"].get("maxExecutors"),
]
):
- raise AirflowException("Make sure initial/min/max value for
dynamic allocation is passed")
+ raise AirflowException("Make sure min/max value for dynamic
allocation is passed")
def update_resources(self):
if self.spec["driver"].get("container_resources"):
diff --git
a/tests/providers/cncf/kubernetes/operators/test_custom_object_launcher.py
b/tests/providers/cncf/kubernetes/operators/test_custom_object_launcher.py
index d33fdd6048..3a57fdefdb 100644
--- a/tests/providers/cncf/kubernetes/operators/test_custom_object_launcher.py
+++ b/tests/providers/cncf/kubernetes/operators/test_custom_object_launcher.py
@@ -64,6 +64,22 @@ class TestSparkJobSpec:
assert spark_job_spec.spec["dynamicAllocation"]["enabled"]
+ def
test_spark_job_spec_dynamicAllocation_enabled_with_default_initial_executors(self):
+ entries = {
+ "spec": {
+ "dynamicAllocation": {
+ "enabled": True,
+ "minExecutors": 1,
+ "maxExecutors": 2,
+ },
+ "driver": {},
+ "executor": {},
+ }
+ }
+ spark_job_spec = SparkJobSpec(**entries)
+
+ assert spark_job_spec.spec["dynamicAllocation"]["enabled"]
+
def
test_spark_job_spec_dynamicAllocation_enabled_with_invalid_config(self):
entries = {
"spec": {
@@ -78,19 +94,11 @@ class TestSparkJobSpec:
}
}
- cloned_entries = entries.copy()
- cloned_entries["spec"]["dynamicAllocation"]["initialExecutors"] = None
- with pytest.raises(
- AirflowException,
- match="Make sure initial/min/max value for dynamic allocation is
passed",
- ):
- SparkJobSpec(**cloned_entries)
-
cloned_entries = entries.copy()
cloned_entries["spec"]["dynamicAllocation"]["minExecutors"] = None
with pytest.raises(
AirflowException,
- match="Make sure initial/min/max value for dynamic allocation is
passed",
+ match="Make sure min/max value for dynamic allocation is passed",
):
SparkJobSpec(**cloned_entries)
@@ -98,7 +106,7 @@ class TestSparkJobSpec:
cloned_entries["spec"]["dynamicAllocation"]["maxExecutors"] = None
with pytest.raises(
AirflowException,
- match="Make sure initial/min/max value for dynamic allocation is
passed",
+ match="Make sure min/max value for dynamic allocation is passed",
):
SparkJobSpec(**cloned_entries)