robbert van waardhuizen created AIRFLOW-2124:
------------------------------------------------

             Summary: Allow local mainPythonFileUri
                 Key: AIRFLOW-2124
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2124
             Project: Apache Airflow
          Issue Type: Wish
            Reporter: robbert van waardhuizen


For our workflow, we currently are in the transition from using BashOperator to 
using the DataProcPySparkOperators. While rewriting the DAG we came to the 
conclusion that it is not possible to submit a (local) path as our main Python 
file, and a Hadoop Compatible Filesystem (HCFS) is required.

Our main Python drivers are located in a Git repository. Putting our main 
Python files in a GS bucket would require manual updating/overwriting these 
files.

In terms of code, this works using the BashOperator:

 
{code:java}
gcloud dataproc jobs submit pyspark \
 /usr/local/airflow/git/airflow-dags/jobs/main_python_driver.py \
                 --cluster {cluster_name}{code}
 

 

But cannot be replicated using the DataProcPySparkOperator:
{code:java}
DataProcPySparkOperator(main="/usr/local/airflow/git/airflow-dags/jobs/main_python_driver.py",
cluster_name=cluster_name)
{code}
Error:
{code:java}
=========== Cloud Dataproc Agent Error ===========
java.lang.NullPointerException
at sun.nio.fs.UnixPath.normalizeAndCheck(UnixPath.java:77)
at sun.nio.fs.UnixPath.<init>(UnixPath.java:71)
at sun.nio.fs.UnixFileSystem.getPath(UnixFileSystem.java:281)
at 
com.google.cloud.hadoop.services.agent.job.AbstractJobHandler.registerResourceForDownload(AbstractJobHandler.java:442)
at 
com.google.cloud.hadoop.services.agent.job.PySparkJobHandler.buildCommand(PySparkJobHandler.java:93)
at 
com.google.cloud.hadoop.services.agent.job.AbstractJobHandler$StartDriver.call(AbstractJobHandler.java:538)
at 
com.google.cloud.hadoop.services.agent.job.AbstractJobHandler$StartDriver.call(AbstractJobHandler.java:532)
at 
com.google.cloud.hadoop.services.repackaged.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:127)
at 
com.google.cloud.hadoop.services.repackaged.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
at 
com.google.cloud.hadoop.services.repackaged.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:80)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
======== End of Cloud Dataproc Agent Error ========
{code}
What would be best practice in this case?

Is it possible to add the ability to submit local paths as main Python file?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to