[ 
https://issues.apache.org/jira/browse/AIRFLOW-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17226763#comment-17226763
 ] 

MOHAMED ACHRAF BAIZ commented on AIRFLOW-6244:
----------------------------------------------

Well, hello, i've got the same issue, and a workaround on the 
*spark_submit_hook.py* you can see that it only considers your spark submitting 
to k8s in cluster mode, but not for client mode, that's why our task failed 
even if the job finished with exit code 0.

in cluster mode, they check for the driver pod name with a regex in logs to get 
the exit code error, but in client mode (when it's the vm with airflow that 
launch the job, well it's itself the driver so it doesn't find the exit code 
obviously).

so the solution for me was to hijack *spark_submit_hook.py;*  first in 
*_process_spark_submit_log* method print the log of the vm that launched the 
job with a simple print function

 
{code:java}
# If we run Kubernetes cluster mode, we want to extract the driver pod id 
# from the logs so we can kill the application when we stop it unexpectedly 
elif self._is_kubernetes:
     print(line)
{code}
 

then in the *submit* method you can adapt it like this so it stops bothering 
you when you work in client mode.
{code:java}
 if returncode or (self._is_kubernetes and 
self._connection['deploy_mode']!='client' and self._spark_exit_code != 0):
       raise AirflowException( "Cannot execute: {}. Error code is: {}.".format( 
     self._mask_cmd(spark_submit_cmd), returncode ) ){code}
 

 

ps : this solution is very customized and just a workaround.   

> spark operator fails tasks for kubernetes jobs, eventhough the job is 
> successful.
> ---------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-6244
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6244
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: operators
>    Affects Versions: 1.10.6
>            Reporter: Mike Prior
>            Priority: Major
>
> When spark jobs are run using the 'spark operator', the job will complete 
> successfully, but the task is marked as failed. This only happens for 
> 'Dynamic Spark' jobs to k8s clusters. The Error code is 0.  If I change the 
> 'spark_submit_hook.py', so that the only check, is the spark exit code, the 
> task is successful. This part of the validation is failing '_*In Kubernetes 
> mode, also check the value # of exit code in the log, as it may differ'***_ I 
> cannot see any non 0 exit code in the logs.
> # Check spark-submit return code. In Kubernetes mode, also check the value
> # of exit code in the log, as it may differ.
> *### if returncode or (self._is_kubernetes and self._spark_exit_code != 0):*
> *if returncode != 0:*
> raise AirflowException(
> "Cannot execute: {}. Error code is: {}.".format(
> spark_submit_cmd, returncode
>  
> {{}}
>  
> --------------------------------------------------------------------------------------------
> {{[2019-12-09 12:00:27,480] \{{taskinstance.py:1058}} ERROR - Cannot execute: 
> ['/opt/spark/bin/spark-submit', '--master', 'k8s://https://10.1.1.1:6443', 
> '--conf', 'spark.executor.instances=2', '--conf', 'spark.executor.memory=4G', 
> '--conf', 'spark.kubernetes.driver.label.app=pyspark243-k8suser', '--conf', 
> 'spark.kubernetes.executor.label.app=pyspark243-k8suser', '--conf', 
> 'spark.kubernetes.local.dirs.tmpfs=true', '--conf', 
> 'spark.kubernetes.driver.limit.cores=2', '--conf', 
> 'spark.kubernetes.executor.limit.cores=4', '--conf', 
> 'spark.kubernetes.container.image.pullPolicy=Always', '--conf', 
> 'spark.kubernetes.container.image=myimage-spark:latest', '--conf', 
> 'spark.kubernetes.namespace=u2cradle', '--conf', 
> 'spark.kubernetes.authenticate.caCertFile=/opt/certs/e.crt', '--conf', 
> 'spark.kubernetes.authenticate.driver.serviceAccountName=u2cradle-devsa', 
> '--conf', 
> 'spark.kubernetes.authenticate.oauthToken=eyJhbGciOiJSUzI1NiIsImtpZCI6I', 
> '--conf', '--num-executors', '2', '--total-executor-cores', '1', 
> '--executor-cores', '1', '--executor-memory', '2g', '--driver-memory', '1g', 
> '--name', 'airflowspark-pi', '--class', 'org.apache.spark.examples.SparkPi', 
> '--verbose', '--deploy-mode', 'client', 
> '/app/airflow/test/spark-examples_2.11-2.4.4.jar']. Error code is: 0.
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 
> 930, in _run_raw_task
>     result = task_copy.execute(context=context)
>   File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/spark_submit_operator.py",
>  line 181, in execute
>     self._hook.submit(self._application)
>   File 
> "/usr/local/lib/python3.6/site-packages/airflow/contrib/hooks/spark_submit_hook.py",
>  line 359, in submit}}{{ }}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to