This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5c97e5be48 add container_name option for SparkKubernetesSensor (#26560)
5c97e5be48 is described below
commit 5c97e5be484ff572070b0ad320c5936bc028be93
Author: hanna-liashchuk <[email protected]>
AuthorDate: Mon Oct 10 08:36:19 2022 +0300
add container_name option for SparkKubernetesSensor (#26560)
* [ISSUE-18468] add container_name option for SparkKubernetesSensor
---
.../cncf/kubernetes/sensors/spark_kubernetes.py | 7 +-
.../kubernetes/sensors/test_spark_kubernetes.py | 96 +++++++++++++++++++++-
2 files changed, 100 insertions(+), 3 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
b/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
index 8cfdbf753c..8879920265 100644
--- a/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
@@ -39,6 +39,7 @@ class SparkKubernetesSensor(BaseSensorOperator):
:param application_name: spark Application resource name
:param namespace: the kubernetes namespace where the sparkApplication
reside in
+ :param container_name: the kubernetes container name where the
sparkApplication reside in
:param kubernetes_conn_id: The :ref:`kubernetes
connection<howto/connection:kubernetes>`
to Kubernetes cluster.
:param attach_log: determines whether logs for driver pod should be
appended to the sensor log
@@ -56,6 +57,7 @@ class SparkKubernetesSensor(BaseSensorOperator):
application_name: str,
attach_log: bool = False,
namespace: str | None = None,
+ container_name: str = "spark-kubernetes-driver",
kubernetes_conn_id: str = "kubernetes_default",
api_group: str = 'sparkoperator.k8s.io',
api_version: str = 'v1beta2',
@@ -65,6 +67,7 @@ class SparkKubernetesSensor(BaseSensorOperator):
self.application_name = application_name
self.attach_log = attach_log
self.namespace = namespace
+ self.container_name = container_name
self.kubernetes_conn_id = kubernetes_conn_id
self.hook = KubernetesHook(conn_id=self.kubernetes_conn_id)
self.api_group = api_group
@@ -84,7 +87,9 @@ class SparkKubernetesSensor(BaseSensorOperator):
log_method = self.log.error if application_state in
self.FAILURE_STATES else self.log.info
try:
log = ""
- for line in self.hook.get_pod_logs(driver_pod_name,
namespace=namespace):
+ for line in self.hook.get_pod_logs(
+ driver_pod_name, namespace=namespace,
container=self.container_name
+ ):
log += line.decode()
log_method(log)
except client.rest.ApiException as e:
diff --git a/tests/providers/cncf/kubernetes/sensors/test_spark_kubernetes.py
b/tests/providers/cncf/kubernetes/sensors/test_spark_kubernetes.py
index c8f6764cac..4f69e73a58 100644
--- a/tests/providers/cncf/kubernetes/sensors/test_spark_kubernetes.py
+++ b/tests/providers/cncf/kubernetes/sensors/test_spark_kubernetes.py
@@ -483,6 +483,68 @@ TEST_PENDING_RERUN_APPLICATION = {
},
}
+TEST_DRIVER_WITH_SIDECAR_APPLICATION = {
+ "apiVersion": "sparkoperator.k8s.io/v1beta2",
+ "kind": "SparkApplication",
+ "metadata": {
+ "creationTimestamp": "2020-02-24T07:34:22Z",
+ "generation": 1,
+ "labels": {"spark_flow_name": "spark-pi"},
+ "name": "spark-pi-2020-02-24-1",
+ "namespace": "default",
+ "resourceVersion": "455577",
+ "selfLink":
"/apis/sparkoperator.k8s.io/v1beta2/namespaces/default/sparkapplications/spark-pi",
+ "uid": "9f825516-6e1a-4af1-8967-b05661e8fb08",
+ },
+ "spec": {
+ "driver": {
+ "coreLimit": "1200m",
+ "cores": 1,
+ "labels": {"spark_flow_name": "spark-pi", "version": "2.4.4"},
+ "memory": "512m",
+ "serviceAccount": "default",
+ "volumeMounts": [{"mountPath": "/tmp", "name": "test-volume"}],
+ "sidecars": [{"name": "sidecar1", "image": "hello-world:latest"}],
+ },
+ "executor": {
+ "cores": 1,
+ "instances": 3,
+ "labels": {"spark_flow_name": "spark-pi", "version": "2.4.4"},
+ "memory": "512m",
+ "volumeMounts": [{"mountPath": "/tmp", "name": "test-volume"}],
+ },
+ "image": "gcr.io/spark-operator/spark:v2.4.4",
+ "imagePullPolicy": "Always",
+ "mainApplicationFile":
"local:///opt/spark/examples/jars/spark-examples_2.11-2.4.4.jar",
+ "mainClass": "org.apache.spark.examples.SparkPi",
+ "mode": "cluster",
+ "restartPolicy": {"type": "Never"},
+ "sparkVersion": "2.4.4",
+ "type": "Scala",
+ "volumes": [{"hostPath": {"path": "/tmp", "type": "Directory"},
"name": "test-volume"}],
+ },
+ "status": {
+ "applicationState": {"state": "COMPLETED"},
+ "driverInfo": {
+ "podName": "spark-pi-2020-02-24-1-driver",
+ "webUIAddress": "10.97.130.44:4040",
+ "webUIPort": 4040,
+ "webUIServiceName": "spark-pi-2020-02-24-1-ui-svc",
+ },
+ "executionAttempts": 1,
+ "executorState": {
+ "spark-pi-2020-02-24-1-1582529666227-exec-1": "FAILED",
+ "spark-pi-2020-02-24-1-1582529666227-exec-2": "FAILED",
+ "spark-pi-2020-02-24-1-1582529666227-exec-3": "FAILED",
+ },
+ "lastSubmissionAttemptTime": "2020-02-24T07:34:30Z",
+ "sparkApplicationId": "spark-7bb432c422ca46f3854838c419460fec",
+ "submissionAttempts": 1,
+ "submissionID": "1a1f9c5e-6bdd-4824-806f-40a814c1cf43",
+ "terminationTime": "2020-02-24T07:35:01Z",
+ },
+}
+
TEST_POD_LOGS = [b"LOG LINE 1\n", b"LOG LINE 2"]
TEST_POD_LOG_RESULT = "LOG LINE 1\nLOG LINE 2"
@@ -726,7 +788,9 @@ class TestSparkKubernetesSensor(unittest.TestCase):
)
with pytest.raises(AirflowException):
sensor.poke(None)
- mock_log_call.assert_called_once_with("spark-pi-driver",
namespace="default")
+ mock_log_call.assert_called_once_with(
+ "spark-pi-driver", namespace="default",
container='spark-kubernetes-driver'
+ )
error_log_call.assert_called_once_with(TEST_POD_LOG_RESULT)
@patch(
@@ -748,7 +812,9 @@ class TestSparkKubernetesSensor(unittest.TestCase):
task_id="test_task_id",
)
sensor.poke(None)
- mock_log_call.assert_called_once_with("spark-pi-2020-02-24-1-driver",
namespace="default")
+ mock_log_call.assert_called_once_with(
+ "spark-pi-2020-02-24-1-driver", namespace="default",
container='spark-kubernetes-driver'
+ )
log_info_call = info_log_call.mock_calls[2]
log_value = log_info_call[1][0]
assert log_value == TEST_POD_LOG_RESULT
@@ -773,3 +839,29 @@ class TestSparkKubernetesSensor(unittest.TestCase):
)
sensor.poke(None)
warn_log_call.assert_called_once()
+
+ @patch(
+
"kubernetes.client.api.custom_objects_api.CustomObjectsApi.get_namespaced_custom_object",
+ return_value=TEST_DRIVER_WITH_SIDECAR_APPLICATION,
+ )
+ @patch("logging.Logger.info")
+ @patch(
+
"airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod_logs",
+ return_value=TEST_POD_LOGS,
+ )
+ def test_sidecar_driver_logging_completed(
+ self, mock_log_call, info_log_call, mock_get_namespaced_crd,
mock_kube_conn
+ ):
+ sensor = SparkKubernetesSensor(
+ application_name="spark_pi",
+ attach_log=True,
+ dag=self.dag,
+ task_id="test_task_id",
+ )
+ sensor.poke(None)
+ mock_log_call.assert_called_once_with(
+ "spark-pi-2020-02-24-1-driver", namespace="default",
container='spark-kubernetes-driver'
+ )
+ log_info_call = info_log_call.mock_calls[2]
+ log_value = log_info_call[1][0]
+ assert log_value == TEST_POD_LOG_RESULT