lyso opened a new issue, #26379:
URL: https://github.com/apache/airflow/issues/26379

   ### Apache Airflow Provider(s)
   
   amazon
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Apache Airflow version
   
   2.2.2
   
   ### Operating System
   
   centos rhel fedora
   
   ### Deployment
   
   MWAA
   
   ### Deployment details
   
   v2.2.2
   
   ### What happened
   
   When using SparkKubernetesSensor, the task failed if the pod running for 
more than 20 minutes.
   
   Error message raised on the 21st minute:
   
   ```
   HTTP response body: 
{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Unauthorized","reason":"Unauthorized","code":401}
   ```
   The assumed root cause was the credential expires at 20 minutes, which 
caused the 401 error.
   
   ### What you think should happen instead
   
   The Sensor should keep renew the credential every time call poke, or capture 
credential expire exception and renew credential.
   
   ### How to reproduce
   
   Reproduce is very easy. It always fail at 21st minute.
   
   1. prepare EKS that spark can run more than 20 minutes.
   2. use the following DAG to trigger the spark.
   
   ```python
   
   from airflow.models.dag import DAG
   from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import 
SparkKubernetesOperator
   from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import 
SparkKubernetesSensor
   
   with DAG(...):
       task1 = SparkKubernetesOperator(...)
       task2 = MySparkKubernetesSensor(...)
       task1 >> task2
   
   ```
   
   ### Anything else
   
   I find a workaround for this issue: override the poke function with my own 
class which inherit from SparkKubernetesSensor.
   
   ```python
   
   from airflow.models.dag import DAG
   from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import 
SparkKubernetesOperator
   from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import 
SparkKubernetesSensor, KubernetesHook, AirflowException
   
   
   class MySparkKubernetesSensor(SparkKubernetesSensor) :
       def poke(self, context: dict) -> bool:
           self.log.info("Poking: %s", self.application_name)
           # Only change here: create a new hook evertime poke instead of reuse 
the same hook
           response = 
KubernetesHook(conn_id=self.kubernetes_conn_id).get_custom_object(
               group=self.api_group,
               version=self.api_version,
               plural="sparkapplications",
               name=self.application_name,
               namespace=self.namespace,
           )
           try:
               application_state = 
response["status"]["applicationState"]["state"]
           except KeyError:
               return False
           if self.attach_log and application_state in self.FAILURE_STATES + 
self.SUCCESS_STATES:
               self._log_driver(application_state, response)
           if application_state in self.FAILURE_STATES:
               raise AirflowException(f"Spark application failed with state: 
{application_state}")
           elif application_state in self.SUCCESS_STATES:
               self.log.info("Spark application ended successfully")
               return True
           else:
               self.log.info("Spark application is still in state: %s", 
application_state)
               return False
   
   
   
   ## put the above in the DAG file
   ## in the dag file, use MySparkKubernetesSensor replacing 
SparkKubernetesSensor like below
   
   
   with DAG(...):
       task1 = SparkKubernetesOperator(...)
       task2 = MySparkKubernetesSensor(...)
       task1 >> task2
   ```
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to