This is an automated email from the ASF dual-hosted git repository.
joshfell pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1005501365 Used cached property for hook in SparkKubernetesOperator
(#34130)
1005501365 is described below
commit 10055013652be3e76c964cbc844b075bb688f088
Author: Vijay Jangir <[email protected]>
AuthorDate: Thu Sep 7 00:54:29 2023 +0530
Used cached property for hook in SparkKubernetesOperator (#34130)
---
.../cncf/kubernetes/operators/spark_kubernetes.py | 5 ++++-
.../kubernetes/operators/test_spark_kubernetes.py | 19 +++++++++++++++++--
2 files changed, 21 insertions(+), 3 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
index 42913a23de..078c9e97e1 100644
--- a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import datetime
+from functools import cached_property
from typing import TYPE_CHECKING, Sequence
from kubernetes.client import ApiException
@@ -81,7 +82,9 @@ class SparkKubernetesOperator(BaseOperator):
self.config_file = config_file
self.watch = watch
- self.hook = KubernetesHook(
+ @cached_property
+ def hook(self) -> KubernetesHook:
+ return KubernetesHook(
conn_id=self.kubernetes_conn_id,
in_cluster=self.in_cluster,
config_file=self.config_file,
diff --git a/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py
b/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py
index c7f30dad24..242967928e 100644
--- a/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py
+++ b/tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py
@@ -28,7 +28,7 @@ from
airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKu
@patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.KubernetesHook")
def test_spark_kubernetes_operator(mock_kubernetes_hook):
- SparkKubernetesOperator(
+ operator = SparkKubernetesOperator(
task_id="task_id",
application_file="application_file",
kubernetes_conn_id="kubernetes_conn_id",
@@ -36,8 +36,23 @@ def test_spark_kubernetes_operator(mock_kubernetes_hook):
cluster_context="cluster_context",
config_file="config_file",
)
+ mock_kubernetes_hook.assert_not_called() # constructor shouldn't call the
hook
- mock_kubernetes_hook.assert_called_once_with(
+ assert "hook" not in operator.__dict__ # Cached property has not been
accessed as part of construction.
+
+
+@patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.KubernetesHook")
+def test_spark_kubernetes_operator_hook(mock_kubernetes_hook):
+ operator = SparkKubernetesOperator(
+ task_id="task_id",
+ application_file="application_file",
+ kubernetes_conn_id="kubernetes_conn_id",
+ in_cluster=True,
+ cluster_context="cluster_context",
+ config_file="config_file",
+ )
+ operator.hook
+ mock_kubernetes_hook.assert_called_with(
conn_id="kubernetes_conn_id",
in_cluster=True,
cluster_context="cluster_context",