PApostol opened a new issue #19897:
URL: https://github.com/apache/airflow/issues/19897


   ### Apache Airflow Provider(s)
   
   apache-spark
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-apache-spark==2.0.1
   
   ### Apache Airflow version
   
   2.1.4
   
   ### Operating System
   
   Amazon Linux 2
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   In the file `airflow/providers/apache/spark/hooks/spark_submit.py`, the 
function 
[_process_spark_status_log](https://github.com/apache/airflow/blob/main/airflow/providers/apache/spark/hooks/spark_submit.py#L516-L535)
 iterates through a `curl` response to get the driver state of a 
`SparkSubmitOperator` task.
   
   If there's a transient network issue and there is no valid response from the 
cluster (e.g. timeout, etc.), there is no "driverState" in the `curl` response, 
which makes the driver state "UNKNOWN".
   
   That state [exits the 
loop](https://github.com/apache/airflow/blob/main/airflow/providers/apache/spark/hooks/spark_submit.py#L573)
 and then makes the task to go on a 
[retry](https://github.com/apache/airflow/blob/main/airflow/providers/apache/spark/hooks/spark_submit.py#L464-L467),
 while the original task is actually still in a "RUNNING" state.
   
   ### What you expected to happen
   
   I would expect the task not to go on a retry while the original task is 
running. The function `_process_spark_status_log` should probably ensure the 
`curl` response is valid before changing the driver state, e.g. check that 
there is a "submissionId"  in the response as well, otherwise leave the state 
to `None` and continue with the polling loop. A valid response would be 
something like this:
   ```
   curl http://spark-host:6066/v1/submissions/status/driver-FOO-BAR
   
   {
     "action" : "SubmissionStatusResponse",
     "driverState" : "RUNNING",
     "serverSparkVersion" : "2.4.6",
     "submissionId" : "driver-FOO-BAR,
     "success" : true,
     "workerHostPort" : "FOO:BAR",
     "workerId" : "worker-FOO-BAR-BAZ"
   }
   
   ### How to reproduce
   
   Use any DAG with a `SparkSubmitOperator` task on a Spark Standalone cluster 
where you can reset the network connection, or modify the `curl` command to 
return something other than the response above.
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to