cb149 opened a new issue #19752:
URL: https://github.com/apache/airflow/issues/19752


   ### Apache Airflow Provider(s)
   
   apache-spark
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-apache-spark 2.0.1
   
   ### Apache Airflow version
   
   2.2.0
   
   ### Operating System
   
   Debian buster
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   Today, my Spark on YARN job ran longer than the 
`execution_timeout=timedelta(minutes=30)`.
   While trying to send the kill signal, there was an error with kinit:
   
   > [2021-11-22, 17:37:57 UTC] {kerberos.py:103} ERROR - Couldn't reinit from 
keytab! `kinit' exited with 1.
   kinit: Failed to store credentials: Credentials cache permissions incorrect 
(filename: /var/airflow_krb5_ccache) while getting initial credentials
   
   And after that, the task just keeps going forever, no more logs, no task 
success or fail, just keeps running.
   
   My guess is that during the part that executes yarn kill, since the timeout 
is missing it will wait forever:
   ```python
    with subprocess.Popen(
       kill_cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE
    ) as yarn_kill:
       self.log.info("YARN app killed with return code: %s", yarn_kill.wait())
   ```
   and there should be a timeout in `yarn_kill.wait()`
   
   ### What you expected to happen
   
   If there is an error trying to send the kill signal to spark-submit, the 
task should fail or timeout at some point, and not keep going infinitely.
   
   ### How to reproduce
   
   Use SparkSubmitOperator in deploy-mode cluster and master yarn with 
execution_timeout shorter than the Spark job needs, with a kerberos ccache 
directory that is not writable.
   
   ### Anything else
   
   The log:
   > [2021-11-22, 17:37:57 UTC] {spark_submit.py:499} INFO - Identified spark 
driver id:
   
   Is written way to often, maybe it would make sense to change:
   
   ```python
                   if match:
                       self._yarn_application_id = match.groups()[0]
                       self.log.info("Identified spark driver id: %s", 
self._yarn_application_id)
   ```
   
   to something like
   
   ```python
                   if match and (not self._yarn_application_id or 
self._yarn_application_id != match.groups()[0]):
                       self._yarn_application_id = match.groups()[0]
                       self.log.info("Identified spark driver id: %s", 
self._yarn_application_id)
   ```
   to write the log only the first time the application_id is identified or 
when it has changed.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to