captify-mkambur opened a new issue, #42132:
URL: https://github.com/apache/airflow/issues/42132
### Apache Airflow version
2.10.0
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
I'm trying to run the Spark application with the custom image using Airflow
2.10.0. Also, I use Kubernetes cluster connection (EKS).
apache-airflow-providers-cncf-kubernetes version is 8.3.4. When I manually
deploy SparkApplication resources to the cluster (via kubectl),the application
works fine and is complete without errors. When I schedule DAG, it starts fine,
and works without interruptions almost till the end, but then fails with error
404. It seems like it cannot communicate with the driver. However, when I check
the cluster, the driver pod is still there and looks fine.
The error text:
```
[2024-09-10, 10:16:22 UTC] {taskinstance.py:3301} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
line 767, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
line 733, in _execute_callable
return ExecutionCallableRunner(
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py",
line 252, in run
return self.func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py",
line 406, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py",
line 293, in execute
return super().execute(context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py",
line 406, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
line 592, in execute
return self.execute_sync(context)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
line 634, in execute_sync
self.pod_manager.await_xcom_sidecar_container_start(pod=self.pod)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py",
line 727, in await_xcom_sidecar_container_start
if self.container_is_running(pod, PodDefaults.SIDECAR_CONTAINER_NAME):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py",
line 645, in container_is_running
remote_pod = self.read_pod(pod)
^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line
336, in wrapped_f
return copy(f, *args, **kw)
^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line
475, in __call__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line
376, in iter
result = action(retry_state)
^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line
418, in exc_check
raise retry_exc.reraise()
^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line
185, in reraise
raise self.last_attempt.result()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in
result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in
__get_result
raise self._exception
File
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line
478, in __call__
result = fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py",
line 720, in read_pod
return self._client.read_namespaced_pod(pod.metadata.name,
pod.metadata.namespace)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api/core_v1_api.py",
line 23693, in read_namespaced_pod
return self.read_namespaced_pod_with_http_info(name, namespace,
**kwargs) # noqa: E501
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api/core_v1_api.py",
line 23780, in read_namespaced_pod_with_http_info
return self.api_client.call_api(
^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api_client.py",
line 348, in call_api
return self.__call_api(resource_path, method,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api_client.py",
line 180, in __call_api
response_data = self.request(
^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/api_client.py",
line 373, in request
return self.rest_client.GET(url,
^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/rest.py",
line 244, in GET
return self.request("GET", url,
^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.12/site-packages/kubernetes/client/rest.py",
line 238, in request
raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Audit-Id':
'b75ef5bb-a810-46c8-a052-6b42307c5229', 'Cache-Control': 'no-cache, private',
'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid':
'034ee985-68e4-4ac9-ba26-c2b9e6b4cd1d', 'X-Kubernetes-Pf-Prioritylevel-Uid':
'2e7745b6-a7a8-493c-99ec-ce7f9c416cff', 'Date': 'Tue, 10 Sep 2024 10:16:22
GMT', 'Content-Length': '270'})
HTTP response body:
{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods
\"<my-spark-app>-task-qyzuffwi-driver\" not
found","reason":"NotFound","details":{"name":"<my-spark-app>-task-qyzuffwi-driver","kind":"pods"},"code":404}
[2024-09-10, 10:16:22 UTC] {taskinstance.py:1225} INFO - Marking task as
FAILED. dag_id=<my-spark-app>, task_id=<my-spark-app>-task,
run_id=scheduled__2024-09-10T09:15:00+00:00, execution_date=20240910T091500,
start_date=20240910T091612, end_date=20240910T101622
```
### What you think should happen instead?
DAG run should finish with the "success" status.
### How to reproduce
Please keep in mind, that all the values in the angle brackets are not real,
they were changed for security reasons. If you need any other information, e.g.
any Airflow config values, please let me know.
DAG file looks like this:
```
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import
SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import
SparkKubernetesSensor
from airflow.utils.dates import days_ago
default_args = {
'owner': '<me>',
'depends_on_past': False,
'start_date': '2024-09-06',
'email': ['<my@mail>'],
'email_on_failure': True,
'email_on_retry': False,
'max_active_runs': 1,
'retries': 0,
'max_active_tis_per_dag': 1 #This parameter controls the number of
concurrent running task instances across dag_runs per task.
#'catchup': False
}
with DAG(
'<my-spark-app>',
default_args=default_args,
schedule_interval="*/10 * * * *",
tags=['<my-spark-app>']
) as dag:
spark_task = SparkKubernetesOperator(
task_id="c<my-spark-app>-task",
application_file='<my-spark-app>.yaml',
dag=dag,
namespace='default',
kubernetes_conn_id='<EKS_CONN>',
do_xcom_push=True,
params={"app_name": f"<my-spark-app>"}
)
sensor = SparkKubernetesSensor(
task_id='<my-spark-app>-monitor',
namespace="default",
application_name="{{
task_instance.xcom_pull(task_ids='<my-spark-app>-task')['metadata']['name'] }}",
kubernetes_conn_id="<EKS_CONN>",
dag=dag,
api_group="sparkoperator.k8s.io",
api_version='v1beta2',
attach_log=True
)
spark_task >> sensor
```
```
SparkApplication resource YAML looks like this:
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
labels: &Labels
app: <my-spark-app>
name: <my-spark-app>
spec:
type: Python
restartPolicy:
type: Never
pythonVersion: "3"
sparkVersion: "3.5.0"
mode: cluster
image: <my-ecr-repo>:<my-spark-app>
mainApplicationFile: local:///<my-spark-app>/main.py
sparkConf:
spark.ui.port: "4040"
spark.ui.showConsoleProgress: "true"
spark.sql.broadcastTimeout: "6000"
spark.hadoop.fs.s3a.multiobjectdelete.enable: "false"
spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled: "true"
spark.metrics.namespace: '<my-spark-app>-qa'
spark.metrics.conf.*.sink.graphite.host: <graphite-exporter-address>
spark.metrics.conf.*.sink.graphite.port: "<graphite-port>"
spark.metrics.conf.*.sink.graphite.class:
"org.apache.spark.metrics.sink.GraphiteSink"
spark.metrics.conf.*.sink.graphite.period: "10"
spark.metrics.conf.*.sink.graphite.unit": "seconds"
spark.metrics.appStatusSource.enabled: "true"
spark.driver.extraJavaOptions:
"-Dlog4j2.configurationFile=file:///<my-spark-app>/log4j2.properties
-Dgraylog_host=<graylog-server> -Dgraylog_port=<graylog-port>
-Dgraylog_app=<my-spark-app> -Dlog4j2.debug=false"
spark.executor.extraJavaOptions:
"-Dlog4j2.configurationFile=file:///<my-spark-app>/log4j2.properties
-Dgraylog_host=<graylog-server> -Dgraylog_port=<graylog-port>
-Dgraylog_app=<my-spark-app> -Dlog4j2.debug=false"
spark.metrics.conf.driver.source.jvm.class:
org.apache.spark.metrics.source.JvmSource
spark.metrics.conf.executor.source.jvm.class:
org.apache.spark.metrics.source.JvmSource
spark.metrics.conf.worker.source.jvm.class:
org.apache.spark.metrics.source.JvmSource
spark.metrics.conf.master.source.jvm.class:
org.apache.spark.metrics.source.JvmSource
hadoopConf:
spark.hadoop.fs.s3a.user.agent.prefix: '<my-spark-app>-qa'
fs.s3a.aws.credentials.provider:
com.amazonaws.auth.WebIdentityTokenCredentialsProvider
fs.s3.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
fs.s3a.bucket.all.committer.magic.enabled: "true"
fs.s3a.endpoint: http://s3.eu-west-1.amazonaws.com
fs.s3a.connection.ssl.enabled: "false"
executor:
nodeSelector:
karpenter.sh/nodepool: default
labels: *Labels
serviceAccount: spark-operator-spark
cores: 8
coreRequest: "7500m"
instances: 1
memory: "40g"
driver:
nodeSelector:
karpenter.sh/nodepool: ondemand
labels: *Labels
serviceAccount: spark-operator-spark
cores: 4
memory: "16g"
env:
- name: DEPLOY_ENV
value: qa
```
### Operating System
Airflow runs on EKS cluster and was deployed using Helm chart (version
1.16.0-dev)
### Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes -- 8.3.4
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]