lyso opened a new issue, #26379:
URL: https://github.com/apache/airflow/issues/26379
### Apache Airflow Provider(s)
amazon
### Versions of Apache Airflow Providers
_No response_
### Apache Airflow version
2.2.2
### Operating System
centos rhel fedora
### Deployment
MWAA
### Deployment details
v2.2.2
### What happened
When using SparkKubernetesSensor, the task failed if the pod running for
more than 20 minutes.
Error message raised on the 21st minute:
```
HTTP response body:
{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Unauthorized","reason":"Unauthorized","code":401}
```
The assumed root cause was the credential expires at 20 minutes, which
caused the 401 error.
### What you think should happen instead
The Sensor should keep renew the credential every time call poke, or capture
credential expire exception and renew credential.
### How to reproduce
Reproduce is very easy. It always fail at 21st minute.
1. prepare EKS that spark can run more than 20 minutes.
2. use the following DAG to trigger the spark.
```python
from airflow.models.dag import DAG
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import
SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import
SparkKubernetesSensor
with DAG(...):
task1 = SparkKubernetesOperator(...)
task2 = MySparkKubernetesSensor(...)
task1 >> task2
```
### Anything else
I find a workaround for this issue: override the poke function with my own
class which inherit from SparkKubernetesSensor.
```python
from airflow.models.dag import DAG
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import
SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import
SparkKubernetesSensor, KubernetesHook, AirflowException
class MySparkKubernetesSensor(SparkKubernetesSensor) :
def poke(self, context: dict) -> bool:
self.log.info("Poking: %s", self.application_name)
# Only change here: create a new hook evertime poke instead of reuse
the same hook
response =
KubernetesHook(conn_id=self.kubernetes_conn_id).get_custom_object(
group=self.api_group,
version=self.api_version,
plural="sparkapplications",
name=self.application_name,
namespace=self.namespace,
)
try:
application_state =
response["status"]["applicationState"]["state"]
except KeyError:
return False
if self.attach_log and application_state in self.FAILURE_STATES +
self.SUCCESS_STATES:
self._log_driver(application_state, response)
if application_state in self.FAILURE_STATES:
raise AirflowException(f"Spark application failed with state:
{application_state}")
elif application_state in self.SUCCESS_STATES:
self.log.info("Spark application ended successfully")
return True
else:
self.log.info("Spark application is still in state: %s",
application_state)
return False
## put the above in the DAG file
## in the dag file, use MySparkKubernetesSensor replacing
SparkKubernetesSensor like below
with DAG(...):
task1 = SparkKubernetesOperator(...)
task2 = MySparkKubernetesSensor(...)
task1 >> task2
```
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]